From adb62e9080763363cdc61ad34ede1db7997ad09f Mon Sep 17 00:00:00 2001 From: Nicolas De Loof Date: Wed, 18 Nov 2020 17:39:34 +0100 Subject: [PATCH 1/5] Run convergence in service dependency order Signed-off-by: Nicolas De Loof --- local/compose.go | 4 ++-- local/convergence.go | 19 ++++++++++++++++- local/dependencies.go | 42 +++++++++++++++++++++++++++++--------- local/dependencies_test.go | 2 +- 4 files changed, 53 insertions(+), 14 deletions(-) diff --git a/local/compose.go b/local/compose.go index de6fe593..f95cd829 100644 --- a/local/compose.go +++ b/local/compose.go @@ -79,8 +79,8 @@ func (s *local) Up(ctx context.Context, project *types.Project, detach bool) err } } - err := inDependencyOrder(ctx, project, func(service types.ServiceConfig) error { - return s.ensureService(ctx, project, service) + err := inDependencyOrder(ctx, project, func(c context.Context, service types.ServiceConfig) error { + return s.ensureService(c, project, service) }) return err } diff --git a/local/convergence.go b/local/convergence.go index 86984a40..b9b35602 100644 --- a/local/convergence.go +++ b/local/convergence.go @@ -33,6 +33,11 @@ import ( "github.com/docker/compose-cli/progress" ) +const ( + extLifecycle = "x-lifecycle" + forceRecreate = "force_recreate" +) + func (s *local) ensureService(ctx context.Context, project *types.Project, service types.ServiceConfig) error { actual, err := s.containerService.apiClient.ContainerList(ctx, moby.ContainerListOptions{ Filters: filters.NewArgs( @@ -80,10 +85,11 @@ func (s *local) ensureService(ctx context.Context, project *types.Project, servi if err != nil { return err } + for _, container := range actual { container := container diverged := container.Labels[configHashLabel] != expected - if diverged { + if diverged || service.Extensions[extLifecycle] == forceRecreate { eg.Go(func() error { return s.recreateContainer(ctx, project, service, container) }) @@ -184,9 +190,20 @@ func (s *local) recreateContainer(ctx context.Context, project *types.Project, s StatusText: "Recreated", Done: true, }) + setDependentLifecycle(project, service.Name, forceRecreate) return nil } +// setDependentLifecycle define the Lifecycle strategy for all services to depend on specified service +func setDependentLifecycle(project *types.Project, service string, strategy string) { + for i, s := range project.Services { + if contains(s.GetDependencies(), service) { + s.Extensions[extLifecycle] = strategy + project.Services[i] = s + } + } +} + func (s *local) restartContainer(ctx context.Context, service types.ServiceConfig, container moby.Container) error { w := progress.ContextWriter(ctx) w.Event(progress.Event{ diff --git a/local/dependencies.go b/local/dependencies.go index 50d7a4c4..26444f33 100644 --- a/local/dependencies.go +++ b/local/dependencies.go @@ -25,15 +25,18 @@ import ( "golang.org/x/sync/errgroup" ) -func inDependencyOrder(ctx context.Context, project *types.Project, fn func(types.ServiceConfig) error) error { - eg, _ := errgroup.WithContext(ctx) +func inDependencyOrder(ctx context.Context, project *types.Project, fn func(context.Context, types.ServiceConfig) error) error { var ( scheduled []string ready []string ) + services := sortByDependency(project.Services) + results := make(chan string) - for len(ready) < len(project.Services) { - for _, service := range project.Services { + errors := make(chan error) + eg, ctx := errgroup.WithContext(ctx) + for len(ready) < len(services) { + for _, service := range services { if contains(scheduled, service.Name) { continue } @@ -41,9 +44,9 @@ func inDependencyOrder(ctx context.Context, project *types.Project, fn func(type service := service scheduled = append(scheduled, service.Name) eg.Go(func() error { - err := fn(service) + err := fn(ctx, service) if err != nil { - close(results) + errors <- err return err } results <- service.Name @@ -51,11 +54,30 @@ func inDependencyOrder(ctx context.Context, project *types.Project, fn func(type }) } } - result, ok := <-results - if !ok { - break + select { + case result := <-results: + ready = append(ready, result) + case err := <-errors: + return err } - ready = append(ready, result) } return eg.Wait() } + +// sortByDependency sort a Service slice so it can be processed in respect to dependency ordering +func sortByDependency(services types.Services) types.Services { + var sorted types.Services + var done []string + for len(sorted) < len(services) { + for _, s := range services { + if contains(done, s.Name) { + continue + } + if containsAll(done, s.GetDependencies()) { + sorted = append(sorted, s) + done = append(done, s.Name) + } + } + } + return sorted +} diff --git a/local/dependencies_test.go b/local/dependencies_test.go index b0b6cfe3..3dc62f69 100644 --- a/local/dependencies_test.go +++ b/local/dependencies_test.go @@ -49,7 +49,7 @@ func TestInDependencyOrder(t *testing.T) { }, } //nolint:errcheck, unparam - go inDependencyOrder(context.TODO(), &project, func(config types.ServiceConfig) error { + go inDependencyOrder(context.TODO(), &project, func(ctx context.Context, config types.ServiceConfig) error { order <- config.Name return nil }) From 251c52664ad0c9bc72004ca901c18b995fff8fcc Mon Sep 17 00:00:00 2001 From: Nicolas De Loof Date: Thu, 19 Nov 2020 10:10:48 +0100 Subject: [PATCH 2/5] Implement service_healthy dependency condition Signed-off-by: Nicolas De Loof --- local/compose.go | 6 ++--- local/convergence.go | 55 ++++++++++++++++++++++++++++++++++++++++++++ local/convert.go | 53 ++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 111 insertions(+), 3 deletions(-) diff --git a/local/compose.go b/local/compose.go index f95cd829..1a5fb701 100644 --- a/local/compose.go +++ b/local/compose.go @@ -389,10 +389,10 @@ func getContainerCreateOptions(p *types.Project, s types.ServiceConfig, number i MacAddress: s.MacAddress, Labels: labels, StopSignal: s.StopSignal, - // Env: s.Environment, FIXME conversion - // Healthcheck: s.HealthCheck, FIXME conversion + Env: toMobyEnv(s.Environment), + Healthcheck: toMobyHealthCheck(s.HealthCheck), // Volumes: // FIXME unclear to me the overlap with HostConfig.Mounts - // StopTimeout: s.StopGracePeriod FIXME conversion + StopTimeout: toSeconds(s.StopGracePeriod), } mountOptions := buildContainerMountOptions(p, s, inherit) diff --git a/local/convergence.go b/local/convergence.go index b9b35602..bc5b89fc 100644 --- a/local/convergence.go +++ b/local/convergence.go @@ -22,6 +22,7 @@ import ( "context" "fmt" "strconv" + "time" "github.com/compose-spec/compose-go/types" moby "github.com/docker/docker/api/types" @@ -39,6 +40,8 @@ const ( ) func (s *local) ensureService(ctx context.Context, project *types.Project, service types.ServiceConfig) error { + s.waitDependencies(ctx, project, service) + actual, err := s.containerService.apiClient.ContainerList(ctx, moby.ContainerListOptions{ Filters: filters.NewArgs( filters.Arg("label", fmt.Sprintf("%s=%s", projectLabel, project.Name)), @@ -108,6 +111,28 @@ func (s *local) ensureService(ctx context.Context, project *types.Project, servi return eg.Wait() } +func (s *local) waitDependencies(ctx context.Context, project *types.Project, service types.ServiceConfig) error { + eg, ctx := errgroup.WithContext(ctx) + for dep, config := range service.DependsOn { + switch config.Condition { + case "service_healthy": + eg.Go(func() error { + for range time.Tick(500 * time.Millisecond) { + healthy, err := s.isServiceHealthy(ctx, project, dep) + if err != nil { + return err + } + if healthy { + return nil + } + } + return nil + }) + } + } + return eg.Wait() +} + func nextContainerNumber(containers []moby.Container) (int, error) { max := 0 for _, c := range containers { @@ -257,3 +282,33 @@ func (s *local) connectContainerToNetwork(ctx context.Context, id string, servic } return nil } + +func (s *local) isServiceHealthy(ctx context.Context, project *types.Project, service string) (bool, error) { + containers, err := s.containerService.apiClient.ContainerList(ctx, moby.ContainerListOptions{ + Filters: filters.NewArgs( + filters.Arg("label", fmt.Sprintf("%s=%s", projectLabel, project.Name)), + filters.Arg("label", fmt.Sprintf("%s=%s", serviceLabel, service)), + ), + }) + if err != nil { + return false, err + } + + for _, c := range containers { + container, err := s.containerService.apiClient.ContainerInspect(ctx, c.ID) + if err != nil { + return false, err + } + if container.State == nil || container.State.Health == nil { + return false, fmt.Errorf("container for service %q has no healthcheck configured", service) + } + switch container.State.Health.Status { + case "starting": + return false, nil + case "unhealthy": + return false, nil + } + } + return true, nil + +} diff --git a/local/convert.go b/local/convert.go index 05931a36..941ec95d 100644 --- a/local/convert.go +++ b/local/convert.go @@ -23,7 +23,9 @@ import ( "sort" "strconv" "strings" + "time" + compose "github.com/compose-spec/compose-go/types" "github.com/docker/docker/api/types" "github.com/docker/docker/api/types/container" "github.com/docker/go-connections/nat" @@ -93,6 +95,57 @@ func toPorts(ports []types.Port) []containers.Port { return result } +func toMobyEnv(environment compose.MappingWithEquals) []string { + var env []string + for k, v := range environment { + if v == nil { + env = append(env, k) + } else { + env = append(env, fmt.Sprintf("%s=%s", k, v)) + } + } + return env +} + +func toMobyHealthCheck(check *compose.HealthCheckConfig) *container.HealthConfig { + if check == nil { + return nil + } + var ( + interval time.Duration + timeout time.Duration + period time.Duration + retries int + ) + if check.Interval != nil { + interval = time.Duration(*check.Interval) + } + if check.Timeout != nil { + timeout = time.Duration(*check.Timeout) + } + if check.StartPeriod != nil { + period = time.Duration(*check.StartPeriod) + } + if check.Retries != nil { + retries = int(*check.Retries) + } + return &container.HealthConfig{ + Test: check.Test, + Interval: interval, + Timeout: timeout, + StartPeriod: period, + Retries: retries, + } +} + +func toSeconds(d *compose.Duration) *int { + if d == nil { + return nil + } + s := int(time.Duration(*d).Seconds()) + return &s +} + func fromPorts(ports []containers.Port) (map[nat.Port]struct{}, map[nat.Port][]nat.PortBinding, error) { var ( exposedPorts = make(map[nat.Port]struct{}, len(ports)) From 7c7e75ca0019d656aee583722c9751d0150f5393 Mon Sep 17 00:00:00 2001 From: Nicolas De Loof Date: Thu, 19 Nov 2020 10:28:57 +0100 Subject: [PATCH 3/5] Ensure extensions map is not nil (should be set by compose-go) Signed-off-by: Nicolas De Loof --- local/convergence.go | 3 +++ local/convert.go | 2 +- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/local/convergence.go b/local/convergence.go index bc5b89fc..b47899dd 100644 --- a/local/convergence.go +++ b/local/convergence.go @@ -223,6 +223,9 @@ func (s *local) recreateContainer(ctx context.Context, project *types.Project, s func setDependentLifecycle(project *types.Project, service string, strategy string) { for i, s := range project.Services { if contains(s.GetDependencies(), service) { + if s.Extensions == nil { + s.Extensions = map[string]interface{}{} + } s.Extensions[extLifecycle] = strategy project.Services[i] = s } diff --git a/local/convert.go b/local/convert.go index 941ec95d..f0b834a7 100644 --- a/local/convert.go +++ b/local/convert.go @@ -101,7 +101,7 @@ func toMobyEnv(environment compose.MappingWithEquals) []string { if v == nil { env = append(env, k) } else { - env = append(env, fmt.Sprintf("%s=%s", k, v)) + env = append(env, fmt.Sprintf("%s=%s", k, *v)) } } return env From e7284e76e902e8757d3a46f12a812d0fd39c8286 Mon Sep 17 00:00:00 2001 From: Nicolas De Loof Date: Thu, 19 Nov 2020 17:31:58 +0100 Subject: [PATCH 4/5] Process services in dependency order as a graph Signed-off-by: Nicolas De Loof --- local/dependencies.go | 108 +++++++++++++++++++++++++++--------------- 1 file changed, 69 insertions(+), 39 deletions(-) diff --git a/local/dependencies.go b/local/dependencies.go index 26444f33..1d5e6bf3 100644 --- a/local/dependencies.go +++ b/local/dependencies.go @@ -26,37 +26,27 @@ import ( ) func inDependencyOrder(ctx context.Context, project *types.Project, fn func(context.Context, types.ServiceConfig) error) error { - var ( - scheduled []string - ready []string - ) - services := sortByDependency(project.Services) + graph := buildDependencyGraph(project.Services) + eg, ctx := errgroup.WithContext(ctx) results := make(chan string) errors := make(chan error) - eg, ctx := errgroup.WithContext(ctx) - for len(ready) < len(services) { - for _, service := range services { - if contains(scheduled, service.Name) { - continue - } - if containsAll(ready, service.GetDependencies()) { - service := service - scheduled = append(scheduled, service.Name) - eg.Go(func() error { - err := fn(ctx, service) - if err != nil { - errors <- err - return err - } - results <- service.Name - return nil - }) - } + for len(graph) > 0 { + for _, n := range graph.independents() { + service := n.service + eg.Go(func() error { + err := fn(ctx, service) + if err != nil { + errors <- err + return err + } + results <- service.Name + return nil + }) } select { case result := <-results: - ready = append(ready, result) + graph.resolved(result) case err := <-errors: return err } @@ -64,20 +54,60 @@ func inDependencyOrder(ctx context.Context, project *types.Project, fn func(cont return eg.Wait() } -// sortByDependency sort a Service slice so it can be processed in respect to dependency ordering -func sortByDependency(services types.Services) types.Services { - var sorted types.Services - var done []string - for len(sorted) < len(services) { - for _, s := range services { - if contains(done, s.Name) { - continue - } - if containsAll(done, s.GetDependencies()) { - sorted = append(sorted, s) - done = append(done, s.Name) - } +type dependencyGraph map[string]node + +type node struct { + service types.ServiceConfig + dependencies []string + dependent []string +} + +func (d dependencyGraph) independents() []node { + var nodes []node + for _, node := range d { + if len(node.dependencies) == 0 { + nodes = append(nodes, node) } } - return sorted + return nodes +} + +func (graph dependencyGraph) resolved(result string) { + for _, parent := range graph[result].dependent { + node := graph[parent] + node.dependencies = remove(node.dependencies, result) + graph[parent] = node + } + delete(graph, result) +} + +func buildDependencyGraph(services types.Services) dependencyGraph { + graph := dependencyGraph{} + for _, s := range services { + graph[s.Name] = node{ + service: s, + } + } + + for _, s := range services { + node := graph[s.Name] + for _, name := range s.GetDependencies() { + dependency := graph[name] + node.dependencies = append(node.dependencies, name) + dependency.dependent = append(dependency.dependent, s.Name) + graph[name] = dependency + } + graph[s.Name] = node + } + return graph +} + +func remove(slice []string, item string) []string { + var s []string + for _, i := range slice { + if i != item { + s = append(s, i) + } + } + return s } From eeb09d9e80b08d3de3c352153327bafa99f4eb00 Mon Sep 17 00:00:00 2001 From: Nicolas De Loof Date: Thu, 19 Nov 2020 17:35:34 +0100 Subject: [PATCH 5/5] apply linter recommendations Signed-off-by: Nicolas De Loof --- local/convergence.go | 11 ++++++++--- local/dependencies.go | 4 ++-- local/util.go | 9 --------- 3 files changed, 10 insertions(+), 14 deletions(-) diff --git a/local/convergence.go b/local/convergence.go index b47899dd..72689d77 100644 --- a/local/convergence.go +++ b/local/convergence.go @@ -40,7 +40,10 @@ const ( ) func (s *local) ensureService(ctx context.Context, project *types.Project, service types.ServiceConfig) error { - s.waitDependencies(ctx, project, service) + err := s.waitDependencies(ctx, project, service) + if err != nil { + return err + } actual, err := s.containerService.apiClient.ContainerList(ctx, moby.ContainerListOptions{ Filters: filters.NewArgs( @@ -117,7 +120,10 @@ func (s *local) waitDependencies(ctx context.Context, project *types.Project, se switch config.Condition { case "service_healthy": eg.Go(func() error { - for range time.Tick(500 * time.Millisecond) { + ticker := time.NewTicker(500 * time.Millisecond) + defer ticker.Stop() + for { + <-ticker.C healthy, err := s.isServiceHealthy(ctx, project, dep) if err != nil { return err @@ -126,7 +132,6 @@ func (s *local) waitDependencies(ctx context.Context, project *types.Project, se return nil } } - return nil }) } } diff --git a/local/dependencies.go b/local/dependencies.go index 1d5e6bf3..e5ac9a77 100644 --- a/local/dependencies.go +++ b/local/dependencies.go @@ -62,9 +62,9 @@ type node struct { dependent []string } -func (d dependencyGraph) independents() []node { +func (graph dependencyGraph) independents() []node { var nodes []node - for _, node := range d { + for _, node := range graph { if len(node.dependencies) == 0 { nodes = append(nodes, node) } diff --git a/local/util.go b/local/util.go index adc1b2e2..c49af75d 100644 --- a/local/util.go +++ b/local/util.go @@ -40,12 +40,3 @@ func contains(slice []string, item string) bool { } return false } - -func containsAll(slice []string, items []string) bool { - for _, i := range items { - if !contains(slice, i) { - return false - } - } - return true -}