diff --git a/kube/client/client.go b/kube/client/client.go index 6c4d6eb0..8d8eb589 100644 --- a/kube/client/client.go +++ b/kube/client/client.go @@ -22,6 +22,9 @@ import ( "context" "fmt" "io" + "net/http" + "os" + "strings" "time" "github.com/docker/compose-cli/api/compose" @@ -29,14 +32,21 @@ import ( "golang.org/x/sync/errgroup" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/cli-runtime/pkg/genericclioptions" "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/portforward" + "k8s.io/client-go/tools/remotecommand" + "k8s.io/client-go/transport/spdy" ) // KubeClient API to access kube objects type KubeClient struct { client *kubernetes.Clientset namespace string + config *rest.Config + ioStreams genericclioptions.IOStreams } // NewKubeClient new kubernetes client @@ -48,7 +58,7 @@ func NewKubeClient(config genericclioptions.RESTClientGetter) (*KubeClient, erro clientset, err := kubernetes.NewForConfig(restConfig) if err != nil { - return nil, err + return nil, fmt.Errorf("failed creating clientset. Error: %+v", err) } namespace, _, err := config.ToRawKubeConfigLoader().Namespace() @@ -59,9 +69,84 @@ func NewKubeClient(config genericclioptions.RESTClientGetter) (*KubeClient, erro return &KubeClient{ client: clientset, namespace: namespace, + config: restConfig, + ioStreams: genericclioptions.IOStreams{In: os.Stdin, Out: os.Stdout, ErrOut: os.Stderr}, }, nil } +// GetPod retrieves a service pod +func (kc KubeClient) GetPod(ctx context.Context, projectName, serviceName string) (*corev1.Pod, error) { + pods, err := kc.client.CoreV1().Pods(kc.namespace).List(ctx, metav1.ListOptions{ + LabelSelector: fmt.Sprintf("%s=%s", compose.ProjectTag, projectName), + }) + if err != nil { + return nil, err + } + if pods == nil { + return nil, nil + } + var pod corev1.Pod + for _, p := range pods.Items { + service := p.Labels[compose.ServiceTag] + if service == serviceName { + pod = p + break + } + } + return &pod, nil +} + +// Exec executes a command in a container +func (kc KubeClient) Exec(ctx context.Context, projectName string, opts compose.RunOptions) error { + pod, err := kc.GetPod(ctx, projectName, opts.Service) + if err != nil || pod == nil { + return err + } + if len(pod.Spec.Containers) == 0 { + return fmt.Errorf("no containers running in pod %s", pod.Name) + } + // get first container in the pod + container := &pod.Spec.Containers[0] + containerName := container.Name + + req := kc.client.CoreV1().RESTClient().Post(). + Resource("pods"). + Name(pod.Name). + Namespace(kc.namespace). + SubResource("exec") + + option := &corev1.PodExecOptions{ + Container: containerName, + Command: opts.Command, + Stdin: true, + Stdout: true, + Stderr: true, + TTY: opts.Tty, + } + + if opts.Reader == nil { + option.Stdin = false + } + + scheme := runtime.NewScheme() + if err := corev1.AddToScheme(scheme); err != nil { + return fmt.Errorf("error adding to scheme: %v", err) + } + parameterCodec := runtime.NewParameterCodec(scheme) + req.VersionedParams(option, parameterCodec) + + exec, err := remotecommand.NewSPDYExecutor(kc.config, "POST", req.URL()) + if err != nil { + return err + } + return exec.Stream(remotecommand.StreamOptions{ + Stdin: opts.Reader, + Stdout: opts.Writer, + Stderr: opts.Writer, + Tty: opts.Tty, + }) +} + // GetContainers get containers for a given compose project func (kc KubeClient) GetContainers(ctx context.Context, projectName string, all bool) ([]compose.ContainerSummary, error) { fieldSelector := "" @@ -76,9 +161,39 @@ func (kc KubeClient) GetContainers(ctx context.Context, projectName string, all if err != nil { return nil, err } + services := map[string][]compose.PortPublisher{} result := []compose.ContainerSummary{} for _, pod := range pods.Items { - result = append(result, podToContainerSummary(pod)) + summary := podToContainerSummary(pod) + serviceName := pod.GetObjectMeta().GetLabels()[compose.ServiceTag] + ports, ok := services[serviceName] + if !ok { + s, err := kc.client.CoreV1().Services(kc.namespace).Get(ctx, serviceName, metav1.GetOptions{}) + if err != nil { + if !strings.Contains(err.Error(), "not found") { + return nil, err + } + result = append(result, summary) + continue + } + ports = []compose.PortPublisher{} + if s != nil { + if s.Spec.Type == corev1.ServiceTypeLoadBalancer { + if len(s.Status.LoadBalancer.Ingress) > 0 { + port := compose.PortPublisher{URL: s.Status.LoadBalancer.Ingress[0].IP} + if len(s.Spec.Ports) > 0 { + port.URL = fmt.Sprintf("%s:%d", port.URL, s.Spec.Ports[0].Port) + port.TargetPort = s.Spec.Ports[0].TargetPort.IntValue() + port.Protocol = string(s.Spec.Ports[0].Protocol) + } + ports = append(ports, port) + } + } + } + services[serviceName] = ports + } + summary.Publishers = ports + result = append(result, summary) } return result, nil @@ -161,3 +276,42 @@ func (kc KubeClient) WaitForPodState(ctx context.Context, opts WaitForStatusOpti } return nil } + +//MapPortsToLocalhost runs a port-forwarder daemon process +func (kc KubeClient) MapPortsToLocalhost(ctx context.Context, opts PortMappingOptions) error { + stopChannel := make(chan struct{}, 1) + readyChannel := make(chan struct{}) + + eg, ctx := errgroup.WithContext(ctx) + for serviceName, servicePorts := range opts.Services { + serviceName := serviceName + servicePorts := servicePorts + pod, err := kc.GetPod(ctx, opts.ProjectName, serviceName) + if err != nil { + return err + } + eg.Go(func() error { + ports := []string{} + for _, p := range servicePorts { + ports = append(ports, fmt.Sprintf("%d:%d", p.PublishedPort, p.TargetPort)) + } + + req := kc.client.CoreV1().RESTClient().Post(). + Resource("pods"). + Name(pod.Name). + Namespace(kc.namespace). + SubResource("portforward") + transport, upgrader, err := spdy.RoundTripperFor(kc.config) + if err != nil { + return err + } + dialer := spdy.NewDialer(upgrader, &http.Client{Transport: transport}, "POST", req.URL()) + fw, err := portforward.New(dialer, ports, stopChannel, readyChannel, os.Stdout, os.Stderr) + if err != nil { + return err + } + return fw.ForwardPorts() + }) + } + return eg.Wait() +} diff --git a/kube/client/utils.go b/kube/client/utils.go index dbe302f8..e93e087b 100644 --- a/kube/client/utils.go +++ b/kube/client/utils.go @@ -28,11 +28,27 @@ import ( ) func podToContainerSummary(pod corev1.Pod) compose.ContainerSummary { + state := compose.RUNNING + + if pod.DeletionTimestamp != nil { + state = compose.REMOVING + } else { + for _, container := range pod.Status.ContainerStatuses { + if container.State.Waiting != nil || container.State.Terminated != nil { + state = compose.UPDATING + break + } + } + if state == compose.RUNNING && pod.Status.Phase != corev1.PodRunning { + state = string(pod.Status.Phase) + } + } + return compose.ContainerSummary{ ID: pod.GetObjectMeta().GetName(), Name: pod.GetObjectMeta().GetName(), Service: pod.GetObjectMeta().GetLabels()[compose.ServiceTag], - State: string(pod.Status.Phase), + State: state, Project: pod.GetObjectMeta().GetLabels()[compose.ProjectTag], } } @@ -46,6 +62,13 @@ func checkPodsState(services []string, pods []corev1.Pod, status string) (bool, if len(services) > 0 && !utils.StringContains(services, service) { continue } + containersRunning := true + for _, container := range pod.Status.ContainerStatuses { + if container.State.Running == nil { + containersRunning = false + break + } + } servicePods[service] = pod.Status.Message if status == compose.REMOVING { @@ -54,7 +77,7 @@ func checkPodsState(services []string, pods []corev1.Pod, status string) (bool, if pod.Status.Phase == corev1.PodFailed { return false, servicePods, fmt.Errorf(pod.Status.Reason) } - if status == compose.RUNNING && pod.Status.Phase != corev1.PodRunning { + if status == compose.RUNNING && (pod.Status.Phase != corev1.PodRunning || !containersRunning) { stateReached = false } } @@ -75,3 +98,12 @@ type WaitForStatusOptions struct { Timeout *time.Duration Log LogFunc } + +// Ports holds published ports data +type Ports []compose.PortPublisher + +// PortMappingOptions holds the port mapping for project services +type PortMappingOptions struct { + ProjectName string + Services map[string]Ports +} diff --git a/kube/compose.go b/kube/compose.go index 389c7eae..7b4e9133 100644 --- a/kube/compose.go +++ b/kube/compose.go @@ -74,7 +74,7 @@ func NewComposeService() (compose.Service, error) { func (s *composeService) Up(ctx context.Context, project *types.Project, options compose.UpOptions) error { w := progress.ContextWriter(ctx) - eventName := "Convert to Helm charts" + eventName := "Convert Compose file to Helm charts" w.Event(progress.CreatingEvent(eventName)) chart, err := helm.GetChartInMemory(project) @@ -83,16 +83,31 @@ func (s *composeService) Up(ctx context.Context, project *types.Project, options } w.Event(progress.NewEvent(eventName, progress.Done, "")) - eventName = "Install Helm charts" - w.Event(progress.CreatingEvent(eventName)) + stack, err := s.sdk.Get(project.Name) + if err != nil || stack == nil { + // install stack + eventName = "Install Compose stack" + w.Event(progress.CreatingEvent(eventName)) - err = s.sdk.InstallChart(project.Name, chart, func(format string, v ...interface{}) { - message := fmt.Sprintf(format, v...) - w.Event(progress.NewEvent(eventName, progress.Done, message)) - }) + err = s.sdk.InstallChart(project.Name, chart, func(format string, v ...interface{}) { + message := fmt.Sprintf(format, v...) + w.Event(progress.NewEvent(eventName, progress.Done, message)) + }) + + } else { + //update stack + eventName = "Updating Compose stack" + w.Event(progress.CreatingEvent(eventName)) + + err = s.sdk.UpdateChart(project.Name, chart, func(format string, v ...interface{}) { + message := fmt.Sprintf(format, v...) + w.Event(progress.NewEvent(eventName, progress.Done, message)) + }) + } if err != nil { return err } + w.Event(progress.NewEvent(eventName, progress.Done, "")) return s.client.WaitForPodState(ctx, client.WaitForStatusOptions{ @@ -266,7 +281,7 @@ func (s *composeService) Remove(ctx context.Context, project *types.Project, opt // Exec executes a command in a running service container func (s *composeService) Exec(ctx context.Context, project *types.Project, opts compose.RunOptions) (int, error) { - return 0, errdefs.ErrNotImplemented + return 0, s.client.Exec(ctx, project.Name, opts) } func (s *composeService) Pause(ctx context.Context, project string, options compose.PauseOptions) error { diff --git a/kube/helm/helm.go b/kube/helm/helm.go index a8eeb135..fd04f787 100644 --- a/kube/helm/helm.go +++ b/kube/helm/helm.go @@ -84,6 +84,19 @@ func (hc *Actions) InstallChart(name string, chart *chart.Chart, logger func(for return err } +// UpdateChart upgrades chart +func (hc *Actions) UpdateChart(name string, chart *chart.Chart, logger func(format string, v ...interface{})) error { + err := hc.initialize(logger) + if err != nil { + return err + } + + actUpgrade := action.NewUpgrade(hc.Config) + actUpgrade.Namespace = hc.Namespace + _, err = actUpgrade.Run(name, chart, map[string]interface{}{}) + return err +} + // Uninstall uninstall chart func (hc *Actions) Uninstall(name string, logger func(format string, v ...interface{})) error { err := hc.initialize(logger) diff --git a/kube/resources/kube.go b/kube/resources/kube.go index 214e9d13..bbbe6b35 100644 --- a/kube/resources/kube.go +++ b/kube/resources/kube.go @@ -94,8 +94,8 @@ func mapToService(project *types.Project, service types.ServiceConfig) *core.Ser } ports = append(ports, core.ServicePort{ - Name: fmt.Sprintf("%d-%s", p.Target, strings.ToLower(p.Protocol)), - Port: int32(p.Target), + Name: fmt.Sprintf("%d-%s", p.Published, strings.ToLower(p.Protocol)), + Port: int32(p.Published), TargetPort: intstr.FromInt(int(p.Target)), Protocol: toProtocol(p.Protocol), })