Containerd Internals

gRPC

What is gRPC?

In gRPC a client application can directly call methods on a server application on a different machine as if it was a local object, making it easier for you to create distributed applications and services. As in many RPC systems, gRPC is based around the idea of defining a service, specifying the methods that can be called remotely with their parameters and return types. On the server side, the server implements this interface and runs a gRPC server to handle client calls. On the client side, the client has a stub that provides exactly the same methods as the server.

gRPC clients and servers can run and talk to each other in a variety of environments - from servers inside Google to your own desktop - and can be written in any of gRPC's supported languages. So, for example, you can easily create a gRPC server in Java with clients in Go, Python, or Ruby. In addition, the latest Google APIs will have gRPC versions of their interfaces, letting you easily build Google functionality into your applications.

protobuf

By default gRPC uses protocol buffers, Google’s mature open source mechanism for serializing structured data (although it can be used with other data formats such as JSON). As you'll see in our example below, you define gRPC services using proto files, with method parameters and return types specified as protocol buffer message types.

Defining the service

We use the protocol buffers interface definition language (IDL) to define our service methods, and define the parameters and return types as protocol buffer message types.

api/grpc/types/api.proto

service API {
    rpc CreateContainer(CreateContainerRequest) returns (CreateContainerResponse) {}
    rpc UpdateContainer(UpdateContainerRequest) returns (UpdateContainerResponse) {}
    rpc Signal(SignalRequest) returns (SignalResponse) {}
    rpc UpdateProcess(UpdateProcessRequest) returns (UpdateProcessResponse) {}
    rpc AddProcess(AddProcessRequest) returns (AddProcessResponse) {}
    rpc CreateCheckpoint(CreateCheckpointRequest) returns (CreateCheckpointResponse) {}
    rpc DeleteCheckpoint(DeleteCheckpointRequest) returns (DeleteCheckpointResponse) {}
    rpc ListCheckpoint(ListCheckpointRequest) returns (ListCheckpointResponse) {}
    rpc State(StateRequest) returns (StateResponse) {}
    rpc Events(EventsRequest) returns (stream Event) {}
    rpc Stats(StatsRequest) returns (StatsResponse) {}
}

and messages for parameters and return types.

Generating gRPC code

Makefile has protoc command.

protoc:
    protoc -I ./api/grpc/types ./api/grpc/types/api.proto --go_out=plugins=grpc:api/grpc/types

This generates api/grpc/types.pb.go, which contains our generated client and server code, as well as code for populating, serializing, and retrieving our message types.

Server Logic

api/grpc/server/server*.go

apiServer interface:

type apiServer struct {
    sv *supervisor.Supervisor
}

Factory method:

// NewServer returns grpc server instance
func NewServer(sv *supervisor.Supervisor) types.APIServer {
    return &apiServer{
        sv: sv,
    }
}

Service handlers' implementation:

func (s *apiServer) CreateContainer(ctx context.Context, c *types.CreateContainerRequest) (*types.CreateContainerResponse, error) {
    if c.BundlePath == "" {
        return nil, errors.New("empty bundle path")
    }
    e := &supervisor.StartTask{}
    e.ID = c.Id
...
}
func (s *apiServer) Signal(ctx context.Context, r *types.SignalRequest) (*types.SignalResponse, error) {
...
}

func (s *apiServer) AddProcess(ctx context.Context, r *types.AddProcessRequest) (*types.AddProcessResponse, error) {
...}
...

Server setup

container/main.go const ( defaultStateDir = "/run/containerd" defaultListenType = "unix" defaultGRPCEndpoint = "/run/containerd/containerd.sock" )

func startServer(address string, sv *supervisor.Supervisor) (*grpc.Server, error) {
    l, err := net.Listen(defaultListenType, address)
    if err != nil {
        return nil, err
    }
    s := grpc.NewServer()
    types.RegisterAPIServer(s, server.NewServer(sv))
    go func() {
        if err := s.Serve(l); err != nil {
            logrus.WithField("error", err).Fatal("containerd: serve grpc")
        }
    }()
    return s, nil
}

Client (docker's libcontainerd)

Init

CmdDaemon

containerdRemote := libcontainerd.New
d, err := daemon.NewDaemon(cli.Config, registryService, containerdRemote)

NewDaemon

d.containerd = containerdRemote.Client(d)

containerdRemote is remote interface; d.containerd is Client interface.

libcontainerd.new

// New creates a fresh instance of libcontainerd remote.
func New(stateDir string, options ...RemoteOption) (_ Remote, err error) {
    ...
    conn, err := grpc.Dial(r.rpcAddr, dialOpts...)

    r.apiClient = containerd.NewAPIClient(conn)
    return r
}

Calling RPC

Use container start() as example:

func (ctr *container) start() error { ... r := &containerd.CreateContainerRequest{ Id: ctr.containerID, BundlePath: ctr.dir, Stdin: ctr.fifo(syscall.Stdin), Stdout: ctr.fifo(syscall.Stdout), Stderr: ctr.fifo(syscall.Stderr), // check to see if we are running in ramdisk to disable pivot root NoPivotRoot: os.Getenv("DOCKER_RAMDISK") != "", } ... resp, err := ctr.client.remote.apiClient.CreateContainer(context.Background(), r) if err != nil { ctr.closeFifos(iopipe) return err } ... }