diff --git a/cli/cmd/serve.go b/cli/cmd/serve.go index 5e681884..119c797e 100644 --- a/cli/cmd/serve.go +++ b/cli/cmd/serve.go @@ -49,9 +49,7 @@ func runServe(ctx context.Context, opts serveOpts) error { p := proxy.NewContainerAPI() containersv1.RegisterContainersServer(s, p) - cliv1.RegisterCliServer(s, &cliServer{ - ctx, - }) + cliv1.RegisterCliServer(s, &cliServer{}) go func() { <-ctx.Done() @@ -66,7 +64,6 @@ func runServe(ctx context.Context, opts serveOpts) error { } type cliServer struct { - ctx context.Context } func (cs *cliServer) Contexts(ctx context.Context, request *cliv1.ContextsRequest) (*cliv1.ContextsResponse, error) { diff --git a/server/server.go b/server/server.go index d3a050f8..8a1287b7 100644 --- a/server/server.go +++ b/server/server.go @@ -48,12 +48,12 @@ import ( func New(ctx context.Context) *grpc.Server { s := grpc.NewServer( grpc.ChainUnaryInterceptor( - unaryMeta(ctx), + unaryServerInterceptor(ctx), unary, ), grpc.ChainStreamInterceptor( grpc.StreamServerInterceptor(stream), - grpc.StreamServerInterceptor(streamMeta(ctx)), + grpc.StreamServerInterceptor(streamServerInterceptor(ctx)), ), ) hs := health.NewServer() @@ -77,9 +77,11 @@ func stream(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, return grpc_prometheus.StreamServerInterceptor(srv, ss, info, handler) } -func unaryMeta(clictx context.Context) func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { +// unaryServerInterceptor configures the context and sends it to the next handler +func unaryServerInterceptor(clictx context.Context) func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { - configuredCtx, err := configureContext(ctx, clictx) + currentContext := getContext(ctx) + configuredCtx, err := configureContext(clictx, currentContext) if err != nil { return nil, err } @@ -88,50 +90,66 @@ func unaryMeta(clictx context.Context) func(ctx context.Context, req interface{} } } -func streamMeta(clictx context.Context) func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { +// streamServerInterceptor configures the context and sends it to the next handler +func streamServerInterceptor(clictx context.Context) func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { return func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { - ctx, err := configureContext(ss.Context(), clictx) + currentContext := getContext(ss.Context()) + ctx, err := configureContext(clictx, currentContext) if err != nil { return err } - nss := newServerStream(ctx, ss) - - return handler(srv, nss) + return handler(srv, newServerStream(ctx, ss)) } } -// nolint: golint -func configureContext(ctx context.Context, clictx context.Context) (context.Context, error) { +// getContext returns the current context name sent in the request metadata, it +// returns an empty string if there is no metadata +// not present +func getContext(ctx context.Context) string { md, ok := metadata.FromIncomingContext(ctx) if !ok { - return ctx, nil + return "" } key, ok := md[apicontext.Key] if !ok { - return ctx, nil + return "" } if len(key) == 1 { - s := store.ContextStore(clictx) - ctx = store.WithContextStore(ctx, s) - ctx = apicontext.WithCurrentContext(ctx, key[0]) + return key[0] + } - c, err := client.New(ctx) - if err != nil { - return nil, err - } + return "" +} - ctx, err = proxy.WithClient(ctx, c) - if err != nil { - return nil, err - } +// configureContext populates the request context with objects the client +// needs: the context store and the api client +func configureContext(ctx context.Context, currentContext string) (context.Context, error) { + s := store.ContextStore(ctx) + ctx = store.WithContextStore(ctx, s) + if currentContext != "" { + ctx = apicontext.WithCurrentContext(ctx, currentContext) + } + + c, err := client.New(ctx) + if err != nil { + return nil, err + } + + ctx, err = proxy.WithClient(ctx, c) + if err != nil { + return nil, err } return ctx, nil } +// A gRPC server stream will only let you get its context but +// there is no way to set a new (augmented context) to the next +// handler (like we do for a unary request). We need to wrap the grpc.ServerSteam +// to be able to set a new context that will be sent to the next stream interceptor. type contextServerStream struct { s grpc.ServerStream ctx context.Context