diff --git a/api/compose/api.go b/api/compose/api.go index 3dcf1734..e19da94f 100644 --- a/api/compose/api.go +++ b/api/compose/api.go @@ -65,10 +65,8 @@ type CreateOptions struct { // StartOptions group options of the Start API type StartOptions struct { - // Attach will attach to container and pipe stdout/stderr to LogConsumer - Attach LogConsumer - // Listener will get notified on container events - Listener chan ContainerExited + // Attach will attach to service containers and pipe stdout/stderr to channel + Attach chan ContainerEvent } // UpOptions group options of the Up API @@ -185,11 +183,21 @@ type Stack struct { // LogConsumer is a callback to process log messages from services type LogConsumer interface { Log(service, container, message string) - Status(service, container, message string) + Status(service, container, msg string) } -// ContainerExited let us know a Container exited -type ContainerExited struct { - Service string - Status int +// ContainerEvent notify an event has been collected on Source container implementing Service +type ContainerEvent struct { + Type int + Source string + Service string + Line string + ExitCode int } + +const ( + // ContainerEventLog is a ContainerEvent of type log. Line is set + ContainerEventLog = iota + // ContainerEventExit is a ContainerEvent of type exit. ExitCode is set + ContainerEventExit +) diff --git a/cli/cmd/compose/start.go b/cli/cmd/compose/start.go index 07be32ee..94d994c0 100644 --- a/cli/cmd/compose/start.go +++ b/cli/cmd/compose/start.go @@ -18,14 +18,12 @@ package compose import ( "context" - "os" - - "github.com/spf13/cobra" "github.com/docker/compose-cli/api/client" "github.com/docker/compose-cli/api/compose" "github.com/docker/compose-cli/api/progress" - "github.com/docker/compose-cli/cli/formatter" + + "github.com/spf13/cobra" ) type startOptions struct { @@ -67,7 +65,23 @@ func runStart(ctx context.Context, opts startOptions, services []string) error { return err } - return c.ComposeService().Start(ctx, project, compose.StartOptions{ - Attach: formatter.NewLogConsumer(ctx, os.Stdout), + queue := make(chan compose.ContainerEvent) + printer := printer{ + queue: queue, + } + err = c.ComposeService().Start(ctx, project, compose.StartOptions{ + Attach: queue, }) + if err != nil { + return err + } + + _, err = printer.run(ctx, false, func() error { + ctx := context.Background() + _, err := progress.Run(ctx, func(ctx context.Context) (string, error) { + return "", c.ComposeService().Stop(ctx, project) + }) + return err + }) + return err } diff --git a/cli/cmd/compose/up.go b/cli/cmd/compose/up.go index ad41cac2..60e74cc4 100644 --- a/cli/cmd/compose/up.go +++ b/cli/cmd/compose/up.go @@ -18,11 +18,11 @@ package compose import ( "context" - "errors" "fmt" - "github.com/sirupsen/logrus" "os" + "os/signal" "path/filepath" + "syscall" "github.com/docker/compose-cli/api/client" "github.com/docker/compose-cli/api/compose" @@ -31,6 +31,7 @@ import ( "github.com/docker/compose-cli/cli/formatter" "github.com/compose-spec/compose-go/types" + "github.com/sirupsen/logrus" "github.com/spf13/cobra" ) @@ -151,47 +152,35 @@ func runCreateStart(ctx context.Context, opts upOptions, services []string) erro return nil } - ctx, cancel := context.WithCancel(ctx) - listener := make(chan compose.ContainerExited) - exitCode := make(chan int) + queue := make(chan compose.ContainerEvent) + printer := printer{ + queue: queue, + } + + stopFunc := func() error { + ctx := context.Background() + _, err := progress.Run(ctx, func(ctx context.Context) (string, error) { + return "", c.ComposeService().Stop(ctx, project) + }) + return err + } + signalChan := make(chan os.Signal, 1) + signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM) go func() { - var aborting bool - for { - exit := <-listener - if opts.cascadeStop && !aborting { - aborting = true - cancel() - exitCode <- exit.Status - } - } + <-signalChan + fmt.Println("Gracefully stopping...") + stopFunc() // nolint:errcheck }() err = c.ComposeService().Start(ctx, project, compose.StartOptions{ - Attach: formatter.NewLogConsumer(ctx, os.Stdout), - Listener: listener, + Attach: queue, }) - - if errors.Is(ctx.Err(), context.Canceled) { - select { - case exit := <-exitCode: - fmt.Println("Aborting on container exit...") - err = stop(c, project) - logrus.Error(exit) - // os.Exit(exit) - default: - // cancelled by user - fmt.Println("Gracefully stopping...") - err = stop(c, project) - } + if err != nil { + return err } - return err -} -func stop(c *client.Client, project *types.Project) error { - ctx := context.Background() - _, err := progress.Run(ctx, func(ctx context.Context) (string, error) { - return "", c.ComposeService().Stop(ctx, project) - }) + _, err = printer.run(ctx, opts.cascadeStop, stopFunc) + // FIXME os.Exit return err } @@ -235,3 +224,26 @@ func setup(ctx context.Context, opts composeOptions, services []string) (*client return c, project, nil } + +type printer struct { + queue chan compose.ContainerEvent +} + +func (p printer) run(ctx context.Context, cascadeStop bool, stopFn func() error) (int, error) { //nolint:unparam + consumer := formatter.NewLogConsumer(ctx, os.Stdout) + for { + event := <-p.queue + switch event.Type { + case compose.ContainerEventExit: + consumer.Status(event.Service, event.Source, fmt.Sprintf("exited with code %d", event.ExitCode)) + if cascadeStop { + fmt.Println("Aborting on container exit...") + err := stopFn() + logrus.Error(event.ExitCode) + return event.ExitCode, err + } + case compose.ContainerEventLog: + consumer.Log(event.Service, event.Source, event.Line) + } + } +} diff --git a/ecs/logs.go b/ecs/logs.go index e36cd3df..59c6df44 100644 --- a/ecs/logs.go +++ b/ecs/logs.go @@ -20,43 +20,13 @@ import ( "context" "github.com/docker/compose-cli/api/compose" + "github.com/docker/compose-cli/utils" ) func (b *ecsAPIService) Logs(ctx context.Context, projectName string, consumer compose.LogConsumer, options compose.LogOptions) error { if len(options.Services) > 0 { - consumer = filteredLogConsumer(consumer, options.Services) + consumer = utils.FilteredLogConsumer(consumer, options.Services) } err := b.aws.GetLogs(ctx, projectName, consumer.Log, options.Follow) return err } - -func filteredLogConsumer(consumer compose.LogConsumer, services []string) compose.LogConsumer { - if len(services) == 0 { - return consumer - } - allowed := map[string]bool{} - for _, s := range services { - allowed[s] = true - } - return &allowListLogConsumer{ - allowList: allowed, - delegate: consumer, - } -} - -type allowListLogConsumer struct { - allowList map[string]bool - delegate compose.LogConsumer -} - -func (a *allowListLogConsumer) Log(service, container, message string) { - if a.allowList[service] { - a.delegate.Log(service, container, message) - } -} - -func (a *allowListLogConsumer) Status(service, container, message string) { - if a.allowList[service] { - a.delegate.Status(service, container, message) - } -} diff --git a/local/compose/attach.go b/local/compose/attach.go index d0ecc963..164e5b1b 100644 --- a/local/compose/attach.go +++ b/local/compose/attach.go @@ -24,14 +24,13 @@ import ( "github.com/docker/compose-cli/api/compose" convert "github.com/docker/compose-cli/local/moby" - "github.com/docker/compose-cli/utils" "github.com/compose-spec/compose-go/types" moby "github.com/docker/docker/api/types" "github.com/docker/docker/pkg/stdcopy" ) -func (s *composeService) attach(ctx context.Context, project *types.Project, consumer compose.LogConsumer) (Containers, error) { +func (s *composeService) attach(ctx context.Context, project *types.Project, consumer chan compose.ContainerEvent) (Containers, error) { containers, err := s.getContainers(ctx, project) if err != nil { return nil, err @@ -52,7 +51,7 @@ func (s *composeService) attach(ctx context.Context, project *types.Project, con return containers, nil } -func (s *composeService) attachContainer(ctx context.Context, container moby.Container, consumer compose.LogConsumer, project *types.Project) error { +func (s *composeService) attachContainer(ctx context.Context, container moby.Container, consumer chan compose.ContainerEvent, project *types.Project) error { serviceName := container.Labels[serviceLabel] w := getWriter(serviceName, getContainerNameWithoutProject(container), consumer) diff --git a/local/compose/logs.go b/local/compose/logs.go index ce6b2f89..2d47c998 100644 --- a/local/compose/logs.go +++ b/local/compose/logs.go @@ -17,6 +17,7 @@ package compose import ( + "bytes" "context" "io" @@ -52,6 +53,7 @@ func (s *composeService) Logs(ctx context.Context, projectName string, consumer } eg, ctx := errgroup.WithContext(ctx) for _, c := range list { + c := c service := c.Labels[serviceLabel] if ignore(service) { continue @@ -73,7 +75,7 @@ func (s *composeService) Logs(ctx context.Context, projectName string, consumer if err != nil { return err } - w := utils.GetWriter(service, container.Name[1:], consumer) + w := utils.GetWriter(service, getContainerNameWithoutProject(c), consumer) if container.Config.Tty { _, err = io.Copy(w, r) } else { @@ -84,3 +86,33 @@ func (s *composeService) Logs(ctx context.Context, projectName string, consumer } return eg.Wait() } + +type splitBuffer struct { + service string + container string + consumer chan compose.ContainerEvent +} + +// getWriter creates a io.Writer that will actually split by line and format by LogConsumer +func getWriter(service, container string, events chan compose.ContainerEvent) io.Writer { + return splitBuffer{ + service: service, + container: container, + consumer: events, + } +} + +func (s splitBuffer) Write(b []byte) (n int, err error) { + split := bytes.Split(b, []byte{'\n'}) + for _, line := range split { + if len(line) != 0 { + s.consumer <- compose.ContainerEvent{ + Type: compose.ContainerEventLog, + Service: s.service, + Source: s.container, + Line: string(line), + } + } + } + return len(b), nil +} diff --git a/local/compose/start.go b/local/compose/start.go index 30fcc5f2..7c4b2f07 100644 --- a/local/compose/start.go +++ b/local/compose/start.go @@ -18,13 +18,12 @@ package compose import ( "context" - "fmt" "github.com/docker/compose-cli/api/compose" "github.com/compose-spec/compose-go/types" "github.com/docker/docker/api/types/container" - "golang.org/x/sync/errgroup" + "github.com/sirupsen/logrus" ) func (s *composeService) Start(ctx context.Context, project *types.Project, options compose.StartOptions) error { @@ -35,12 +34,6 @@ func (s *composeService) Start(ctx context.Context, project *types.Project, opti return err } containers = c - } else { - c, err := s.getContainers(ctx, project) - if err != nil { - return err - } - containers = c } err := InDependencyOrder(ctx, project, func(c context.Context, service types.ServiceConfig) error { @@ -54,26 +47,21 @@ func (s *composeService) Start(ctx context.Context, project *types.Project, opti return nil } - eg, ctx := errgroup.WithContext(ctx) for _, c := range containers { c := c - eg.Go(func() error { - statusC, errC := s.apiClient.ContainerWait(ctx, c.ID, container.WaitConditionNotRunning) + go func() { + statusC, errC := s.apiClient.ContainerWait(context.Background(), c.ID, container.WaitConditionNotRunning) select { case status := <-statusC: - service := c.Labels[serviceLabel] - options.Attach.Status(service, getCanonicalContainerName(c), fmt.Sprintf("exited with code %d", status.StatusCode)) - if options.Listener != nil { - options.Listener <- compose.ContainerExited{ - Service: service, - Status: int(status.StatusCode), - } + options.Attach <- compose.ContainerEvent{ + Type: compose.ContainerEventExit, + Source: getCanonicalContainerName(c), + ExitCode: int(status.StatusCode), } - return nil case err := <-errC: - return err + logrus.Warnf("Unexpected API error for %s : %s\n", getCanonicalContainerName(c), err.Error()) } - }) + }() } - return eg.Wait() + return nil } diff --git a/local/e2e/compose/cascade_stop_test.go b/local/e2e/compose/cascade_stop_test.go index 449e825e..1cb06078 100644 --- a/local/e2e/compose/cascade_stop_test.go +++ b/local/e2e/compose/cascade_stop_test.go @@ -31,6 +31,8 @@ func TestCascadeStop(t *testing.T) { res := c.RunDockerCmd("compose", "-f", "./fixtures/cascade-stop-test/compose.yaml", "--project-name", projectName, "up", "--abort-on-container-exit") res.Assert(t, icmd.Expected{Out: `PING localhost (127.0.0.1)`}) - res.Assert(t, icmd.Expected{Out: `ping_1 exited with code 0`}) + res.Assert(t, icmd.Expected{Out: `/does_not_exist: No such file or directory`}) + res.Assert(t, icmd.Expected{Out: `should_fail_1 exited with code 1`}) res.Assert(t, icmd.Expected{Out: `Aborting on container exit...`}) + // FIXME res.Assert(t, icmd.Expected{ExitCode: 1}) } diff --git a/local/e2e/compose/fixtures/cascade-stop-test/compose.yaml b/local/e2e/compose/fixtures/cascade-stop-test/compose.yaml index 24ef57c7..e30fd3a5 100644 --- a/local/e2e/compose/fixtures/cascade-stop-test/compose.yaml +++ b/local/e2e/compose/fixtures/cascade-stop-test/compose.yaml @@ -1,4 +1,7 @@ services: + should_fail: + image: busybox:1.27.2 + command: ls /does_not_exist ping: image: busybox:1.27.2 - command: ping localhost -c 1 + command: ping localhost diff --git a/utils/logconsumer.go b/utils/logconsumer.go index 2dd7e496..8caf6ba2 100644 --- a/utils/logconsumer.go +++ b/utils/logconsumer.go @@ -58,6 +58,12 @@ func (a *allowListLogConsumer) Log(service, container, message string) { } } +func (a *allowListLogConsumer) Status(service, container, message string) { + if a.allowList[service] { + a.delegate.Status(service, container, message) + } +} + type splitBuffer struct { service string container string