diff --git a/api/compose/api.go b/api/compose/api.go index 41a250f4..115dbc09 100644 --- a/api/compose/api.go +++ b/api/compose/api.go @@ -379,7 +379,9 @@ type ContainerEvent struct { Container string Service string Line string - ExitCode int + // ContainerEventExit only + ExitCode int + Restarting bool } const ( diff --git a/api/compose/printer.go b/api/compose/printer.go new file mode 100644 index 00000000..31c8f1d3 --- /dev/null +++ b/api/compose/printer.go @@ -0,0 +1,109 @@ +/* + Copyright 2020 Docker Compose CLI authors + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package compose + +import ( + "fmt" + + "github.com/sirupsen/logrus" +) + +// LogPrinter watch application containers an collect their logs +type LogPrinter interface { + HandleEvent(event ContainerEvent) + Run(cascadeStop bool, exitCodeFrom string, stopFn func() error) (int, error) + Cancel() +} + +// NewLogPrinter builds a LogPrinter passing containers logs to LogConsumer +func NewLogPrinter(consumer LogConsumer) LogPrinter { + queue := make(chan ContainerEvent) + printer := printer{ + consumer: consumer, + queue: queue, + } + return &printer +} + +func (p *printer) Cancel() { + p.queue <- ContainerEvent{ + Type: UserCancel, + } +} + +type printer struct { + queue chan ContainerEvent + consumer LogConsumer +} + +func (p *printer) HandleEvent(event ContainerEvent) { + p.queue <- event +} + +func (p *printer) Run(cascadeStop bool, exitCodeFrom string, stopFn func() error) (int, error) { + var ( + aborting bool + exitCode int + ) + containers := map[string]struct{}{} + for { + event := <-p.queue + container := event.Container + switch event.Type { + case UserCancel: + aborting = true + case ContainerEventAttach: + if _, ok := containers[container]; ok { + continue + } + containers[container] = struct{}{} + p.consumer.Register(container) + case ContainerEventExit: + if !event.Restarting { + delete(containers, container) + } + if !aborting { + p.consumer.Status(container, fmt.Sprintf("exited with code %d", event.ExitCode)) + } + if cascadeStop { + if !aborting { + aborting = true + fmt.Println("Aborting on container exit...") + err := stopFn() + if err != nil { + return 0, err + } + } + if exitCodeFrom == "" { + exitCodeFrom = event.Service + } + if exitCodeFrom == event.Service { + logrus.Error(event.ExitCode) + exitCode = event.ExitCode + } + } + if len(containers) == 0 { + // Last container terminated, done + return exitCode, nil + } + case ContainerEventLog: + if !aborting { + p.consumer.Log(container, event.Service, event.Line) + } + } + } +} diff --git a/cli/cmd/compose/up.go b/cli/cmd/compose/up.go index bb3b80f7..21465f64 100644 --- a/cli/cmd/compose/up.go +++ b/cli/cmd/compose/up.go @@ -29,7 +29,6 @@ import ( "github.com/compose-spec/compose-go/types" "github.com/docker/cli/cli" - "github.com/sirupsen/logrus" "github.com/spf13/cobra" "golang.org/x/sync/errgroup" @@ -275,13 +274,12 @@ func runCreateStart(ctx context.Context, backend compose.Service, opts upOptions return nil } - queue := make(chan compose.ContainerEvent) - printer := printer{ - queue: queue, - } + consumer := formatter.NewLogConsumer(ctx, os.Stdout, !opts.noColor, !opts.noPrefix) + printer := compose.NewLogPrinter(consumer) signalChan := make(chan os.Signal, 1) signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM) + stopFunc := func() error { ctx := context.Background() _, err := progress.Run(ctx, func(ctx context.Context) (string, error) { @@ -296,27 +294,21 @@ func runCreateStart(ctx context.Context, backend compose.Service, opts upOptions } go func() { <-signalChan - queue <- compose.ContainerEvent{ - Type: compose.UserCancel, - } + printer.Cancel() fmt.Println("Gracefully stopping... (press Ctrl+C again to force)") stopFunc() // nolint:errcheck }() - consumer := formatter.NewLogConsumer(ctx, os.Stdout, !opts.noColor, !opts.noPrefix) - var exitCode int eg, ctx := errgroup.WithContext(ctx) eg.Go(func() error { - code, err := printer.run(opts.cascadeStop, opts.exitCodeFrom, consumer, stopFunc) + code, err := printer.Run(opts.cascadeStop, opts.exitCodeFrom, stopFunc) exitCode = code return err }) err = backend.Start(ctx, project, compose.StartOptions{ - Attach: func(event compose.ContainerEvent) { - queue <- event - }, + Attach: printer.HandleEvent, Services: services, }) if err != nil { @@ -341,11 +333,7 @@ func setServiceScale(project *types.Project, name string, replicas int) error { if err != nil { return err } - if service.Deploy == nil { - service.Deploy = &types.DeployConfig{} - } - count := uint64(replicas) - service.Deploy.Replicas = &count + service.Scale = replicas project.Services[i] = service return nil } @@ -392,49 +380,3 @@ func setup(opts composeOptions, services []string) (*types.Project, error) { return project, nil } - -type printer struct { - queue chan compose.ContainerEvent -} - -func (p printer) run(cascadeStop bool, exitCodeFrom string, consumer compose.LogConsumer, stopFn func() error) (int, error) { - var aborting bool - var count int - for { - event := <-p.queue - switch event.Type { - case compose.UserCancel: - aborting = true - case compose.ContainerEventAttach: - consumer.Register(event.Container) - count++ - case compose.ContainerEventExit: - if !aborting { - consumer.Status(event.Container, fmt.Sprintf("exited with code %d", event.ExitCode)) - } - if cascadeStop { - if !aborting { - aborting = true - fmt.Println("Aborting on container exit...") - err := stopFn() - if err != nil { - return 0, err - } - } - if exitCodeFrom == "" || exitCodeFrom == event.Service { - logrus.Error(event.ExitCode) - return event.ExitCode, nil - } - } - count-- - if count == 0 { - // Last container terminated, done - return 0, nil - } - case compose.ContainerEventLog: - if !aborting { - consumer.Log(event.Container, event.Service, event.Line) - } - } - } -} diff --git a/cli/cmd/compose/up_test.go b/cli/cmd/compose/up_test.go index 566a6847..4042d9ca 100644 --- a/cli/cmd/compose/up_test.go +++ b/cli/cmd/compose/up_test.go @@ -39,5 +39,5 @@ func TestApplyScaleOpt(t *testing.T) { assert.NilError(t, err) foo, err := p.GetService("foo") assert.NilError(t, err) - assert.Check(t, *foo.Deploy.Replicas == 2) + assert.Equal(t, foo.Scale, 2) } diff --git a/local/compose/attach.go b/local/compose/attach.go index ddef6eb4..fc8f6739 100644 --- a/local/compose/attach.go +++ b/local/compose/attach.go @@ -33,10 +33,6 @@ import ( ) func (s *composeService) attach(ctx context.Context, project *types.Project, listener compose.ContainerEventListener, selectedServices []string) (Containers, error) { - if len(selectedServices) == 0 { - selectedServices = project.ServiceNames() - } - containers, err := s.getContainers(ctx, project.Name, oneOffExclude, true, selectedServices...) if err != nil { return nil, err @@ -57,44 +53,6 @@ func (s *composeService) attach(ctx context.Context, project *types.Project, lis return nil, err } } - - // Watch events to capture container restart and re-attach - go func() { - crashed := map[string]struct{}{} - s.Events(ctx, project.Name, compose.EventsOptions{ // nolint: errcheck - Services: selectedServices, - Consumer: func(event compose.Event) error { - if event.Status == "die" { - crashed[event.Container] = struct{}{} - return nil - } - if _, ok := crashed[event.Container]; ok { - inspect, err := s.apiClient.ContainerInspect(ctx, event.Container) - if err != nil { - return err - } - - container := moby.Container{ - ID: event.Container, - Names: []string{inspect.Name}, - State: convert.ContainerRunning, - Labels: map[string]string{ - projectLabel: project.Name, - serviceLabel: event.Service, - }, - } - - // Just ignore errors when reattaching to already crashed containers - s.attachContainer(ctx, container, listener, project) // nolint: errcheck - delete(crashed, event.Container) - - s.waitContainer(container, listener) - } - return nil - }, - }) - }() - return containers, err } diff --git a/local/compose/down.go b/local/compose/down.go index 73582c36..0d4a9220 100644 --- a/local/compose/down.go +++ b/local/compose/down.go @@ -191,18 +191,22 @@ func (s *composeService) removeVolume(ctx context.Context, id string, w progress } func (s *composeService) stopContainers(ctx context.Context, w progress.Writer, containers []moby.Container, timeout *time.Duration) error { + eg, ctx := errgroup.WithContext(ctx) for _, container := range containers { - toStop := container - eventName := getContainerProgressName(toStop) - w.Event(progress.StoppingEvent(eventName)) - err := s.apiClient.ContainerStop(ctx, toStop.ID, timeout) - if err != nil { - w.Event(progress.ErrorMessageEvent(eventName, "Error while Stopping")) - return err - } - w.Event(progress.StoppedEvent(eventName)) + container := container + eg.Go(func() error { + eventName := getContainerProgressName(container) + w.Event(progress.StoppingEvent(eventName)) + err := s.apiClient.ContainerStop(ctx, container.ID, timeout) + if err != nil { + w.Event(progress.ErrorMessageEvent(eventName, "Error while Stopping")) + return err + } + w.Event(progress.StoppedEvent(eventName)) + return nil + }) } - return nil + return eg.Wait() } func (s *composeService) removeContainers(ctx context.Context, w progress.Writer, containers []moby.Container, timeout *time.Duration) error { diff --git a/local/compose/start.go b/local/compose/start.go index b0f6354d..f09a4368 100644 --- a/local/compose/start.go +++ b/local/compose/start.go @@ -24,22 +24,26 @@ import ( "github.com/compose-spec/compose-go/types" moby "github.com/docker/docker/api/types" - "github.com/docker/docker/api/types/container" - "github.com/sirupsen/logrus" + "github.com/pkg/errors" + "golang.org/x/sync/errgroup" ) func (s *composeService) Start(ctx context.Context, project *types.Project, options compose.StartOptions) error { + listener := options.Attach if len(options.Services) == 0 { options.Services = project.ServiceNames() } - var containers Containers - if options.Attach != nil { - c, err := s.attach(ctx, project, options.Attach, options.Services) + eg, ctx := errgroup.WithContext(ctx) + if listener != nil { + attached, err := s.attach(ctx, project, listener, options.Services) if err != nil { return err } - containers = c + + eg.Go(func() error { + return s.watchContainers(project, options.Services, listener, attached) + }) } err := InDependencyOrder(ctx, project, func(c context.Context, service types.ServiceConfig) error { @@ -51,32 +55,79 @@ func (s *composeService) Start(ctx context.Context, project *types.Project, opti if err != nil { return err } + return eg.Wait() +} - if options.Attach == nil { +// watchContainers uses engine events to capture container start/die and notify ContainerEventListener +func (s *composeService) watchContainers(project *types.Project, services []string, listener compose.ContainerEventListener, containers Containers) error { + watched := map[string]int{} + for _, c := range containers { + watched[c.ID] = 0 + } + + ctx, stop := context.WithCancel(context.Background()) + err := s.Events(ctx, project.Name, compose.EventsOptions{ + Services: services, + Consumer: func(event compose.Event) error { + inspected, err := s.apiClient.ContainerInspect(ctx, event.Container) + if err != nil { + return err + } + container := moby.Container{ + ID: inspected.ID, + Names: []string{inspected.Name}, + Labels: inspected.Config.Labels, + } + name := getContainerNameWithoutProject(container) + + if event.Status == "die" { + restarted := watched[container.ID] + watched[container.ID] = restarted + 1 + // Container terminated. + willRestart := inspected.HostConfig.RestartPolicy.MaximumRetryCount > restarted + + listener(compose.ContainerEvent{ + Type: compose.ContainerEventExit, + Container: name, + Service: container.Labels[serviceLabel], + ExitCode: inspected.State.ExitCode, + Restarting: willRestart, + }) + + if !willRestart { + // we're done with this one + delete(watched, container.ID) + } + + if len(watched) == 0 { + // all project containers stopped, we're done + stop() + } + return nil + } + + if event.Status == "start" { + count, ok := watched[container.ID] + mustAttach := ok && count > 0 // Container restarted, need to re-attach + if !ok { + // A new container has just been added to service by scale + watched[container.ID] = 0 + mustAttach = true + } + if mustAttach { + // Container restarted, need to re-attach + err := s.attachContainer(ctx, container, listener, project) + if err != nil { + return err + } + } + } + + return nil + }, + }) + if errors.Is(ctx.Err(), context.Canceled) { return nil } - - for _, c := range containers { - c := c - go func() { - s.waitContainer(c, options.Attach) - }() - } - return nil -} - -func (s *composeService) waitContainer(c moby.Container, listener compose.ContainerEventListener) { - statusC, errC := s.apiClient.ContainerWait(context.Background(), c.ID, container.WaitConditionNotRunning) - name := getContainerNameWithoutProject(c) - select { - case status := <-statusC: - listener(compose.ContainerEvent{ - Type: compose.ContainerEventExit, - Container: name, - Service: c.Labels[serviceLabel], - ExitCode: int(status.StatusCode), - }) - case err := <-errC: - logrus.Warnf("Unexpected API error for %s : %s", name, err.Error()) - } + return err } diff --git a/local/compose/stop_test.go b/local/compose/stop_test.go index ea15b8a9..a3ce3818 100644 --- a/local/compose/stop_test.go +++ b/local/compose/stop_test.go @@ -38,7 +38,7 @@ func TestStopTimeout(t *testing.T) { tested.apiClient = api ctx := context.Background() - api.EXPECT().ContainerList(ctx, projectFilterListOpt()).Return( + api.EXPECT().ContainerList(gomock.Any(), projectFilterListOpt()).Return( []moby.Container{ testContainer("service1", "123"), testContainer("service1", "456"), @@ -46,9 +46,9 @@ func TestStopTimeout(t *testing.T) { }, nil) timeout := time.Duration(2) * time.Second - api.EXPECT().ContainerStop(ctx, "123", &timeout).Return(nil) - api.EXPECT().ContainerStop(ctx, "456", &timeout).Return(nil) - api.EXPECT().ContainerStop(ctx, "789", &timeout).Return(nil) + api.EXPECT().ContainerStop(gomock.Any(), "123", &timeout).Return(nil) + api.EXPECT().ContainerStop(gomock.Any(), "456", &timeout).Return(nil) + api.EXPECT().ContainerStop(gomock.Any(), "789", &timeout).Return(nil) err := tested.Stop(ctx, &types.Project{ Name: testProject, diff --git a/local/e2e/compose/compose_test.go b/local/e2e/compose/compose_test.go index 459e96c4..e90db4f3 100644 --- a/local/e2e/compose/compose_test.go +++ b/local/e2e/compose/compose_test.go @@ -27,6 +27,7 @@ import ( "testing" "time" + testify "github.com/stretchr/testify/assert" "gotest.tools/v3/assert" "gotest.tools/v3/icmd" @@ -197,10 +198,10 @@ func TestAttachRestart(t *testing.T) { c.WaitForCondition(func() (bool, string) { debug := res.Combined() - return strings.Count(res.Stdout(), "another_1 exited with code 1") == 3, fmt.Sprintf("'another_1 exited with code 1' not found 3 times in : \n%s\n", debug) + return strings.Count(res.Stdout(), "failing_1 exited with code 1") == 3, fmt.Sprintf("'failing_1 exited with code 1' not found 3 times in : \n%s\n", debug) }, 2*time.Minute, 2*time.Second) - assert.Equal(t, strings.Count(res.Stdout(), "another_1 | world"), 3, res.Combined()) + assert.Equal(t, strings.Count(res.Stdout(), "failing_1 | world"), 3, res.Combined()) } func TestInitContainer(t *testing.T) { @@ -208,7 +209,5 @@ func TestInitContainer(t *testing.T) { res := c.RunDockerOrExitError("compose", "--ansi=never", "--project-directory", "./fixtures/init-container", "up") defer c.RunDockerOrExitError("compose", "-p", "init-container", "down") - output := res.Stdout() - - assert.Assert(t, strings.Contains(output, "foo_1 | hello\nbar_1 | world"), res.Combined()) + testify.Regexp(t, "foo_1 | hello(?m:.*)bar_1 | world", res.Stdout()) } diff --git a/local/e2e/compose/fixtures/attach-restart/compose.yaml b/local/e2e/compose/fixtures/attach-restart/compose.yaml index eb364bda..d9214367 100644 --- a/local/e2e/compose/fixtures/attach-restart/compose.yaml +++ b/local/e2e/compose/fixtures/attach-restart/compose.yaml @@ -1,8 +1,5 @@ services: - simple: - image: alpine - command: sh -c "sleep infinity" - another: + failing: image: alpine command: sh -c "sleep 0.1 && echo world && /bin/false" deploy: diff --git a/utils/hash.go b/utils/hash.go index 2715f5b0..ffe30cff 100644 --- a/utils/hash.go +++ b/utils/hash.go @@ -29,6 +29,7 @@ func ServiceHash(o types.ServiceConfig) (string, error) { // remove the Build config when generating the service hash o.Build = nil o.PullPolicy = "" + o.Scale = 1 bytes, err := json.Marshal(o) if err != nil { return "", err