From 1e7ce90561543aa2c880727410abda7c9ca6ba4b Mon Sep 17 00:00:00 2001 From: aiordache Date: Thu, 4 Feb 2021 10:04:42 +0100 Subject: [PATCH] Kube backend: add the `compose logs` command Signed-off-by: aiordache --- kube/client/client.go | 37 +++++++++++++++++--- kube/compose.go | 6 +++- local/compose/attach.go | 3 +- local/compose/logs.go | 29 ++-------------- utils/logconsumer.go | 75 +++++++++++++++++++++++++++++++++++++++++ 5 files changed, 117 insertions(+), 33 deletions(-) create mode 100644 utils/logconsumer.go diff --git a/kube/client/client.go b/kube/client/client.go index 8eddc272..2d91dd9e 100644 --- a/kube/client/client.go +++ b/kube/client/client.go @@ -21,13 +21,15 @@ package client import ( "context" "fmt" + "io" - v1 "k8s.io/api/core/v1" + "github.com/docker/compose-cli/api/compose" + "github.com/docker/compose-cli/utils" + "golang.org/x/sync/errgroup" + corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/cli-runtime/pkg/genericclioptions" "k8s.io/client-go/kubernetes" - - "github.com/docker/compose-cli/api/compose" ) // KubeClient API to access kube objects @@ -81,7 +83,7 @@ func (kc KubeClient) GetContainers(ctx context.Context, projectName string, all return result, nil } -func podToContainerSummary(pod v1.Pod) compose.ContainerSummary { +func podToContainerSummary(pod corev1.Pod) compose.ContainerSummary { return compose.ContainerSummary{ ID: pod.GetObjectMeta().GetName(), Name: pod.GetObjectMeta().GetName(), @@ -90,3 +92,30 @@ func podToContainerSummary(pod v1.Pod) compose.ContainerSummary { Project: pod.GetObjectMeta().GetLabels()[compose.ProjectTag], } } + +// GetLogs retrieves pod logs +func (c *KubeClient) GetLogs(ctx context.Context, projectName string, consumer compose.LogConsumer, follow bool) error { + pods, err := c.client.CoreV1().Pods(c.namespace).List(ctx, metav1.ListOptions{ + LabelSelector: fmt.Sprintf("%s=%s", compose.ProjectTag, projectName), + }) + if err != nil { + return err + } + eg, ctx := errgroup.WithContext(ctx) + for _, pod := range pods.Items { + request := c.client.CoreV1().Pods(c.namespace).GetLogs(pod.Name, &corev1.PodLogOptions{Follow: follow}) + service := pod.Labels[compose.ServiceTag] + w := utils.GetWriter(service, pod.Name, consumer) + + eg.Go(func() error { + r, err := request.Stream(ctx) + defer r.Close() // nolint errcheck + if err != nil { + return err + } + _, err = io.Copy(w, r) + return err + }) + } + return eg.Wait() +} diff --git a/kube/compose.go b/kube/compose.go index 1d80aca5..3ef24bb8 100644 --- a/kube/compose.go +++ b/kube/compose.go @@ -33,6 +33,7 @@ import ( "github.com/docker/compose-cli/kube/client" "github.com/docker/compose-cli/kube/helm" "github.com/docker/compose-cli/kube/resources" + "github.com/docker/compose-cli/utils" ) type composeService struct { @@ -154,7 +155,10 @@ func (s *composeService) Stop(ctx context.Context, project *types.Project) error // Logs executes the equivalent to a `compose logs` func (s *composeService) Logs(ctx context.Context, projectName string, consumer compose.LogConsumer, options compose.LogOptions) error { - return errdefs.ErrNotImplemented + if len(options.Services) > 0 { + consumer = utils.FilteredLogConsumer(consumer, options.Services) + } + return s.client.GetLogs(ctx, projectName, consumer, options.Follow) } // Ps executes the equivalent to a `compose ps` diff --git a/local/compose/attach.go b/local/compose/attach.go index 814986d4..8149561f 100644 --- a/local/compose/attach.go +++ b/local/compose/attach.go @@ -24,6 +24,7 @@ import ( "github.com/docker/compose-cli/api/compose" convert "github.com/docker/compose-cli/local/moby" + "github.com/docker/compose-cli/utils" "github.com/compose-spec/compose-go/types" moby "github.com/docker/docker/api/types" @@ -62,7 +63,7 @@ func (s *composeService) attach(ctx context.Context, project *types.Project, con func (s *composeService) attachContainer(ctx context.Context, container moby.Container, consumer compose.LogConsumer, project *types.Project) error { serviceName := container.Labels[serviceLabel] - w := getWriter(serviceName, getCanonicalContainerName(container), consumer) + w := utils.GetWriter(serviceName, getCanonicalContainerName(container), consumer) service, err := project.GetService(serviceName) if err != nil { diff --git a/local/compose/logs.go b/local/compose/logs.go index 7f4ec45b..ce6b2f89 100644 --- a/local/compose/logs.go +++ b/local/compose/logs.go @@ -17,11 +17,11 @@ package compose import ( - "bytes" "context" "io" "github.com/docker/compose-cli/api/compose" + "github.com/docker/compose-cli/utils" "github.com/docker/docker/api/types" "github.com/docker/docker/api/types/filters" @@ -73,7 +73,7 @@ func (s *composeService) Logs(ctx context.Context, projectName string, consumer if err != nil { return err } - w := getWriter(service, container.Name[1:], consumer) + w := utils.GetWriter(service, container.Name[1:], consumer) if container.Config.Tty { _, err = io.Copy(w, r) } else { @@ -84,28 +84,3 @@ func (s *composeService) Logs(ctx context.Context, projectName string, consumer } return eg.Wait() } - -type splitBuffer struct { - service string - container string - consumer compose.LogConsumer -} - -// getWriter creates a io.Writer that will actually split by line and format by LogConsumer -func getWriter(service, container string, l compose.LogConsumer) io.Writer { - return splitBuffer{ - service: service, - container: container, - consumer: l, - } -} - -func (s splitBuffer) Write(b []byte) (n int, err error) { - split := bytes.Split(b, []byte{'\n'}) - for _, line := range split { - if len(line) != 0 { - s.consumer.Log(s.service, s.container, string(line)) - } - } - return len(b), nil -} diff --git a/utils/logconsumer.go b/utils/logconsumer.go new file mode 100644 index 00000000..2dd7e496 --- /dev/null +++ b/utils/logconsumer.go @@ -0,0 +1,75 @@ +/* + Copyright 2020 Docker Compose CLI authors + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package utils + +import ( + "bytes" + "io" + + "github.com/docker/compose-cli/api/compose" +) + +// GetWriter creates a io.Writer that will actually split by line and format by LogConsumer +func GetWriter(service, container string, l compose.LogConsumer) io.Writer { + return splitBuffer{ + service: service, + container: container, + consumer: l, + } +} + +// FilteredLogConsumer filters logs for given services +func FilteredLogConsumer(consumer compose.LogConsumer, services []string) compose.LogConsumer { + if len(services) == 0 { + return consumer + } + allowed := map[string]bool{} + for _, s := range services { + allowed[s] = true + } + return &allowListLogConsumer{ + allowList: allowed, + delegate: consumer, + } +} + +type allowListLogConsumer struct { + allowList map[string]bool + delegate compose.LogConsumer +} + +func (a *allowListLogConsumer) Log(service, container, message string) { + if a.allowList[service] { + a.delegate.Log(service, container, message) + } +} + +type splitBuffer struct { + service string + container string + consumer compose.LogConsumer +} + +func (s splitBuffer) Write(b []byte) (n int, err error) { + split := bytes.Split(b, []byte{'\n'}) + for _, line := range split { + if len(line) != 0 { + s.consumer.Log(s.service, s.container, string(line)) + } + } + return len(b), nil +}