From 136d09e1acc207542b0d4cd0302d5a54130f618f Mon Sep 17 00:00:00 2001 From: Nicolas De Loof Date: Wed, 2 Dec 2020 18:23:01 +0100 Subject: [PATCH] split `Up` into `Create`+`Start` so logs don't collide with progress Signed-off-by: Nicolas De Loof --- aci/compose.go | 10 ++++- api/client/compose.go | 10 ++++- api/compose/api.go | 6 ++- cli/cmd/compose/up.go | 68 ++++++++++++++++++++++++++++------ ecs/local/compose.go | 22 +++++++---- ecs/up.go | 11 +++++- example/backend.go | 11 +++++- formatter/logs.go | 2 +- local/compose.go | 77 ++++++++++++++++++++------------------- local/container.go | 35 ++++++++++++++++++ local/containers.go | 6 ++- local/e2e/compose_test.go | 2 +- local/labels.go | 4 -- server/proxy/compose.go | 2 +- 14 files changed, 198 insertions(+), 68 deletions(-) diff --git a/aci/compose.go b/aci/compose.go index 444f643b..7756b1f8 100644 --- a/aci/compose.go +++ b/aci/compose.go @@ -56,7 +56,15 @@ func (cs *aciComposeService) Pull(ctx context.Context, project *types.Project) e return errdefs.ErrNotImplemented } -func (cs *aciComposeService) Up(ctx context.Context, project *types.Project, detach bool, w io.Writer) error { +func (cs *aciComposeService) Create(ctx context.Context, project *types.Project) error { + return errdefs.ErrNotImplemented +} + +func (cs *aciComposeService) Start(ctx context.Context, project *types.Project, w io.Writer) error { + return errdefs.ErrNotImplemented +} + +func (cs *aciComposeService) Up(ctx context.Context, project *types.Project, detach bool) error { logrus.Debugf("Up on project with name %q", project.Name) if err := autocreateFileshares(ctx, project); err != nil { diff --git a/api/client/compose.go b/api/client/compose.go index 9e5ca09e..21e374e9 100644 --- a/api/client/compose.go +++ b/api/client/compose.go @@ -41,7 +41,15 @@ func (c *composeService) Pull(ctx context.Context, project *types.Project) error return errdefs.ErrNotImplemented } -func (c *composeService) Up(context.Context, *types.Project, bool, io.Writer) error { +func (c *composeService) Create(ctx context.Context, project *types.Project) error { + return errdefs.ErrNotImplemented +} + +func (c *composeService) Start(ctx context.Context, project *types.Project, w io.Writer) error { + return errdefs.ErrNotImplemented +} + +func (c *composeService) Up(context.Context, *types.Project, bool) error { return errdefs.ErrNotImplemented } diff --git a/api/compose/api.go b/api/compose/api.go index 9295fae9..0f578430 100644 --- a/api/compose/api.go +++ b/api/compose/api.go @@ -31,8 +31,12 @@ type Service interface { Push(ctx context.Context, project *types.Project) error // Pull executes the equivalent of a `compose pull` Pull(ctx context.Context, project *types.Project) error + // Create executes the equivalent to a `compose create` + Create(ctx context.Context, project *types.Project) error + // Start executes the equivalent to a `compose start` + Start(ctx context.Context, project *types.Project, w io.Writer) error // Up executes the equivalent to a `compose up` - Up(ctx context.Context, project *types.Project, detach bool, w io.Writer) error + Up(ctx context.Context, project *types.Project, detach bool) error // Down executes the equivalent to a `compose down` Down(ctx context.Context, projectName string) error // Logs executes the equivalent to a `compose logs` diff --git a/cli/cmd/compose/up.go b/cli/cmd/compose/up.go index d02606aa..14fa2cba 100644 --- a/cli/cmd/compose/up.go +++ b/cli/cmd/compose/up.go @@ -18,14 +18,18 @@ package compose import ( "context" - "github.com/docker/compose-cli/progress" + "errors" + "fmt" + "io" "os" "github.com/compose-spec/compose-go/cli" + "github.com/compose-spec/compose-go/types" "github.com/spf13/cobra" "github.com/docker/compose-cli/api/client" "github.com/docker/compose-cli/context/store" + "github.com/docker/compose-cli/progress" ) func upCommand(contextType string) *cobra.Command { @@ -33,7 +37,12 @@ func upCommand(contextType string) *cobra.Command { upCmd := &cobra.Command{ Use: "up [SERVICE...]", RunE: func(cmd *cobra.Command, args []string) error { - return runUp(cmd.Context(), opts, args) + switch contextType { + case store.LocalContextType: + return runCreateStart(cmd.Context(), opts, args) + default: + return runUp(cmd.Context(), opts, args) + } }, } upCmd.Flags().StringVarP(&opts.Name, "project-name", "p", "", "Project name") @@ -50,19 +59,60 @@ func upCommand(contextType string) *cobra.Command { } func runUp(ctx context.Context, opts composeOptions, services []string) error { - c, err := client.New(ctx) + c, project, err := setup(ctx, opts, services) if err != nil { return err } - options, err := opts.toProjectOptions() + _, err = progress.Run(ctx, func(ctx context.Context) (string, error) { + return "", c.ComposeService().Up(ctx, project, opts.Detach) + }) + return err +} + +func runCreateStart(ctx context.Context, opts composeOptions, services []string) error { + c, project, err := setup(ctx, opts, services) if err != nil { return err } - project, err := cli.ProjectFromOptions(options) + + _, err = progress.Run(ctx, func(ctx context.Context) (string, error) { + return "", c.ComposeService().Create(ctx, project) + }) if err != nil { return err } + + var w io.Writer + if !opts.Detach { + w = os.Stdout + } + + err = c.ComposeService().Start(ctx, project, w) + if errors.Is(ctx.Err(), context.Canceled) { + fmt.Println("Gracefully stopping...") + ctx = context.Background() + _, err = progress.Run(ctx, func(ctx context.Context) (string, error) { + return "", c.ComposeService().Down(ctx, project.Name) + }) + } + return err +} + +func setup(ctx context.Context, opts composeOptions, services []string) (*client.Client, *types.Project, error) { + c, err := client.New(ctx) + if err != nil { + return nil, nil, err + } + + options, err := opts.toProjectOptions() + if err != nil { + return nil, nil, err + } + project, err := cli.ProjectFromOptions(options) + if err != nil { + return nil, nil, err + } if opts.DomainName != "" { // arbitrarily set the domain name on the first service ; ACI backend will expose the entire project project.Services[0].DomainName = opts.DomainName @@ -70,11 +120,7 @@ func runUp(ctx context.Context, opts composeOptions, services []string) error { err = filter(project, services) if err != nil { - return err + return nil, nil, err } - - _, err = progress.Run(ctx, func(ctx context.Context) (string, error) { - return "", c.ComposeService().Up(ctx, project, opts.Detach, os.Stdout) - }) - return err + return c, project, nil } diff --git a/ecs/local/compose.go b/ecs/local/compose.go index 8d7130bb..adeb06ba 100644 --- a/ecs/local/compose.go +++ b/ecs/local/compose.go @@ -28,17 +28,16 @@ import ( "path/filepath" "strings" - types2 "github.com/docker/docker/api/types" - "github.com/docker/docker/api/types/filters" - - "github.com/docker/compose-cli/api/compose" - "github.com/docker/compose-cli/errdefs" - "github.com/aws/aws-sdk-go/aws" "github.com/compose-spec/compose-go/types" + types2 "github.com/docker/docker/api/types" + "github.com/docker/docker/api/types/filters" "github.com/pkg/errors" "github.com/sanathkr/go-yaml" "golang.org/x/mod/semver" + + "github.com/docker/compose-cli/api/compose" + "github.com/docker/compose-cli/errdefs" ) func (e ecsLocalSimulation) Build(ctx context.Context, project *types.Project) error { @@ -53,7 +52,16 @@ func (e ecsLocalSimulation) Pull(ctx context.Context, project *types.Project) er return errdefs.ErrNotImplemented } -func (e ecsLocalSimulation) Up(ctx context.Context, project *types.Project, detach bool, w io.Writer) error { +func (e ecsLocalSimulation) Create(ctx context.Context, project *types.Project) error { + return errdefs.ErrNotImplemented +} + +func (e ecsLocalSimulation) Start(ctx context.Context, project *types.Project, w io.Writer) error { + return errdefs.ErrNotImplemented +} + +func (e ecsLocalSimulation) Up(ctx context.Context, project *types.Project, detach bool) error { + cmd := exec.Command("docker-compose", "version", "--short") b := bytes.Buffer{} b.WriteString("v") diff --git a/ecs/up.go b/ecs/up.go index b1147f7e..cab0b6f0 100644 --- a/ecs/up.go +++ b/ecs/up.go @@ -40,7 +40,16 @@ func (b *ecsAPIService) Pull(ctx context.Context, project *types.Project) error return errdefs.ErrNotImplemented } -func (b *ecsAPIService) Up(ctx context.Context, project *types.Project, detach bool, w io.Writer) error { +func (b *ecsAPIService) Create(ctx context.Context, project *types.Project) error { + return errdefs.ErrNotImplemented +} + +func (b *ecsAPIService) Start(ctx context.Context, project *types.Project, w io.Writer) error { + return errdefs.ErrNotImplemented +} + +func (b *ecsAPIService) Up(ctx context.Context, project *types.Project, detach bool) error { + err := b.aws.CheckRequirements(ctx, b.Region) if err != nil { return err diff --git a/example/backend.go b/example/backend.go index 533ace82..e6c1fc6d 100644 --- a/example/backend.go +++ b/example/backend.go @@ -151,7 +151,16 @@ func (cs *composeService) Pull(ctx context.Context, project *types.Project) erro return errdefs.ErrNotImplemented } -func (cs *composeService) Up(ctx context.Context, project *types.Project, detach bool, w io.Writer) error { +func (cs *composeService) Create(ctx context.Context, project *types.Project) error { + return errdefs.ErrNotImplemented +} + +func (cs *composeService) Start(ctx context.Context, project *types.Project, w io.Writer) error { + return errdefs.ErrNotImplemented +} + +func (cs *composeService) Up(ctx context.Context, project *types.Project, detach bool) error { + fmt.Printf("Up command on project %q", project.Name) return nil } diff --git a/formatter/logs.go b/formatter/logs.go index 8ad06225..d01b9d01 100644 --- a/formatter/logs.go +++ b/formatter/logs.go @@ -37,7 +37,7 @@ func NewLogConsumer(ctx context.Context, w io.Writer) LogConsumer { // Log formats a log message as received from service/container func (l *LogConsumer) Log(service, container, message string) { - if l.ctx.Err() == context.Canceled { + if l.ctx.Err() != nil { return } cf, ok := l.colors[service] diff --git a/local/compose.go b/local/compose.go index c736c8ed..a6653e33 100644 --- a/local/compose.go +++ b/local/compose.go @@ -53,6 +53,7 @@ import ( "github.com/docker/compose-cli/api/compose" "github.com/docker/compose-cli/config" + errdefs2 "github.com/docker/compose-cli/errdefs" "github.com/docker/compose-cli/formatter" "github.com/docker/compose-cli/progress" ) @@ -293,7 +294,11 @@ func toProgressEvent(prefix string, jm jsonmessage.JSONMessage, w progress.Write }) } -func (s *composeService) Up(ctx context.Context, project *types.Project, detach bool, w io.Writer) error { +func (s *composeService) Up(ctx context.Context, project *types.Project, detach bool) error { + return errdefs2.ErrNotImplemented +} + +func (s *composeService) Create(ctx context.Context, project *types.Project) error { err := s.ensureImagesExists(ctx, project) if err != nil { return err @@ -327,29 +332,34 @@ func (s *composeService) Up(ctx context.Context, project *types.Project, detach } } - err = InDependencyOrder(ctx, project, func(c context.Context, service types.ServiceConfig) error { + return InDependencyOrder(ctx, project, func(c context.Context, service types.ServiceConfig) error { return s.ensureService(c, project, service) }) +} + +func (s *composeService) Start(ctx context.Context, project *types.Project, w io.Writer) error { + var group *errgroup.Group + if w != nil { + eg, err := s.attach(ctx, project, w) + if err != nil { + return err + } + group = eg + } + + err := InDependencyOrder(ctx, project, func(c context.Context, service types.ServiceConfig) error { + return s.startService(ctx, project, service) + }) if err != nil { return err } - - if detach { - err = inDependencyOrder(ctx, project, func(c context.Context, service types.ServiceConfig) error { - return s.startService(ctx, project, service) - }) - return err + if group != nil { + return group.Wait() } - - if detach { - return nil - } - - progress.ContextWriter(ctx).Stop() - return s.attach(ctx, project, w) + return nil } -func (s *composeService) attach(ctx context.Context, project *types.Project, w io.Writer) error { +func (s *composeService) attach(ctx context.Context, project *types.Project, w io.Writer) (*errgroup.Group, error) { consumer := formatter.NewLogConsumer(ctx, w) containers, err := s.apiClient.ContainerList(ctx, moby.ContainerListOptions{ Filters: filters.NewArgs( @@ -358,7 +368,7 @@ func (s *composeService) attach(ctx context.Context, project *types.Project, w i All: true, }) if err != nil { - return err + return nil, err } var names []string @@ -370,37 +380,31 @@ func (s *composeService) attach(ctx context.Context, project *types.Project, w i eg, ctx := errgroup.WithContext(ctx) for _, c := range containers { container := c - eg.Go(func() error { - return s.attachContainer(ctx, container, project, consumer) + return s.attachContainer(ctx, container, consumer, project) }) } - - eg.Go(func() error { - <-ctx.Done() - fmt.Println("Gracefully stopping...") - ctx = context.Background() - _, err = progress.Run(ctx, func(ctx context.Context) (string, error) { - return "", s.Down(ctx, project.Name) - }) - return nil - }) - - return eg.Wait() + return eg, nil } -func (s *composeService) attachContainer(ctx context.Context, container moby.Container, project *types.Project, consumer formatter.LogConsumer) error { +func (s *composeService) attachContainer(ctx context.Context, container moby.Container, consumer formatter.LogConsumer, project *types.Project) error { serviceName := container.Labels[serviceLabel] + w := consumer.GetWriter(serviceName, container.ID) service, err := project.GetService(serviceName) if err != nil { return err } + reader, err := s.getContainerStdout(ctx, container) if err != nil { return err } - w := consumer.GetWriter(serviceName, container.ID) + go func() { + <-ctx.Done() + reader.Close() //nolint:errcheck + }() + if service.Tty { _, err = io.Copy(w, reader) } else { @@ -409,8 +413,8 @@ func (s *composeService) attachContainer(ctx context.Context, container moby.Con return err } -func (s *composeService) getContainerStdout(ctx context.Context, container moby.Container) (io.Reader, error) { - var reader io.Reader +func (s *composeService) getContainerStdout(ctx context.Context, container moby.Container) (io.ReadCloser, error) { + var reader io.ReadCloser if container.State == containerRunning { logs, err := s.apiClient.ContainerLogs(ctx, container.ID, moby.ContainerLogsOptions{ ShowStdout: true, @@ -431,7 +435,7 @@ func (s *composeService) getContainerStdout(ctx context.Context, container moby. if err != nil { return nil, err } - reader = cnx.Reader + reader = containerStdout{cnx} err = s.apiClient.ContainerStart(ctx, container.ID, moby.ContainerStartOptions{}) if err != nil { @@ -476,7 +480,6 @@ func (s *composeService) Down(ctx context.Context, projectName string) error { Filters: filters.NewArgs( projectFilter(projectName), ), - All: true, }) if err != nil { return err diff --git a/local/container.go b/local/container.go index d8a2d133..4ebdddee 100644 --- a/local/container.go +++ b/local/container.go @@ -18,6 +18,12 @@ package local +import ( + "io" + + moby "github.com/docker/docker/api/types" +) + const ( containerCreated = "created" containerRestarting = "restarting" @@ -27,3 +33,32 @@ const ( containerExited = "exited" //nolint containerDead = "dead" //nolint ) + +var _ io.ReadCloser = containerStdout{} + +type containerStdout struct { + moby.HijackedResponse +} + +func (l containerStdout) Read(p []byte) (n int, err error) { + return l.Reader.Read(p) +} + +func (l containerStdout) Close() error { + l.HijackedResponse.Close() + return nil +} + +var _ io.WriteCloser = containerStdin{} + +type containerStdin struct { + moby.HijackedResponse +} + +func (c containerStdin) Write(p []byte) (n int, err error) { + return c.Conn.Write(p) +} + +func (c containerStdin) Close() error { + return c.CloseWrite() +} diff --git a/local/containers.go b/local/containers.go index fa7594d2..24912e4e 100644 --- a/local/containers.go +++ b/local/containers.go @@ -143,7 +143,11 @@ func (cs *containerService) Run(ctx context.Context, r containers.ContainerConfi return cs.apiClient.ContainerStart(ctx, id, types.ContainerStartOptions{}) } -func (cs *containerService) create(ctx context.Context, containerConfig *container.Config, hostConfig *container.HostConfig, networkingConfig *network.NetworkingConfig, platform *specs.Platform, name string) (string, error) { +func (cs *containerService) create(ctx context.Context, + containerConfig *container.Config, + hostConfig *container.HostConfig, + networkingConfig *network.NetworkingConfig, + platform *specs.Platform, name string) (string, error) { created, err := cs.apiClient.ContainerCreate(ctx, containerConfig, hostConfig, networkingConfig, platform, name) if err != nil { diff --git a/local/e2e/compose_test.go b/local/e2e/compose_test.go index 0f38067b..9e5ea535 100644 --- a/local/e2e/compose_test.go +++ b/local/e2e/compose_test.go @@ -45,7 +45,7 @@ func TestLocalBackendComposeUp(t *testing.T) { }) t.Run("up", func(t *testing.T) { - c.RunDockerCmd("compose", "up", "-f", "../../tests/composefiles/demo_multi_port.yaml", "--project-name", projectName) + c.RunDockerCmd("compose", "up", "-f", "../../tests/composefiles/demo_multi_port.yaml", "--project-name", projectName, "-d") }) t.Run("check running project", func(t *testing.T) { diff --git a/local/labels.go b/local/labels.go index 28bb68ea..e0ca6010 100644 --- a/local/labels.go +++ b/local/labels.go @@ -51,7 +51,3 @@ func serviceFilter(serviceName string) filters.KeyValuePair { func hasProjectLabelFilter() filters.KeyValuePair { return filters.Arg("label", projectLabel) } - -func serviceFilter(serviceName string) filters.KeyValuePair { - return filters.Arg("label", fmt.Sprintf("%s=%s", serviceLabel, serviceName)) -} diff --git a/server/proxy/compose.go b/server/proxy/compose.go index 9d21eb70..59afcf55 100644 --- a/server/proxy/compose.go +++ b/server/proxy/compose.go @@ -30,7 +30,7 @@ func (p *proxy) Up(ctx context.Context, request *composev1.ComposeUpRequest) (*c if err != nil { return nil, err } - return &composev1.ComposeUpResponse{ProjectName: project.Name}, Client(ctx).ComposeService().Up(ctx, project, true, nil) + return &composev1.ComposeUpResponse{ProjectName: project.Name}, Client(ctx).ComposeService().Up(ctx, project, true) } func (p *proxy) Down(ctx context.Context, request *composev1.ComposeDownRequest) (*composev1.ComposeDownResponse, error) {