gRPC Go
GothamGo 2015
Sameer Ajmani
Tech Lead Manager, Go team, Google
Sameer Ajmani
Tech Lead Manager, Go team, Google
This talk was presented at GothamGo in New York City, October 2015.
2In Go, an RPC starts a goroutine running on the server and provides message passing between the client and server goroutines.
Unary RPC: the client sends a request to the server, then the server sends a response.
Streaming RPC: the client and server may each send one or more messages.
An RPC ends when:
This talk will show how we connect RPCs and streams with goroutines and channels.
3Example: a mobile Maps app requests a route from point A to point B.
On the client side, an RPC blocks until it's done or canceled.
A client uses multiple goroutines to run many RPCs simultaneously.
Each RPC is an exchange between a client goroutine and a server goroutine.
4A client starts a stream with a server.
Messages sent on a stream are delivered FIFO.
Many streams can run simultaneously between the same client and server.
The transport provides buffering and flow control.
Examples:
Provides RPC and streaming RPC
Ten languages: C, Java, Go, C++, Node.js, Python, Ruby, Objective-C, PHP, and C#
IDL: Proto3
Transport: HTTP2
golang.org/x/net/context for deadlines, cancellation, and request-scoped values
golang.org/x/net/trace for real-time request traces and connection logging
150+ imports of google.golang.org/grpc on pkg.go.dev
syntax = "proto3"; service Google { // Search returns a Google search result for the query. rpc Search(Request) returns (Result) { } } message Request { string query = 1; } message Result { string title = 1; string url = 2; string snippet = 3; }
protoc ./search.proto --go_out=plugins=grpc:.
type GoogleClient interface { // Search returns a Google search result for the query. Search(ctx context.Context, in *Request, opts ...grpc.CallOption) (*Result, error) }
type GoogleServer interface { // Search returns a Google search result for the query. Search(context.Context, *Request) (*Result, error) }
type Request struct { Query string `protobuf:"bytes,1,opt,name=query" json:"query,omitempty"` }
type Result struct { Title string `protobuf:"bytes,1,opt,name=title" json:"title,omitempty"` Url string `protobuf:"bytes,2,opt,name=url" json:"url,omitempty"` Snippet string `protobuf:"bytes,3,opt,name=snippet" json:"snippet,omitempty"` }
import pb "golang.org/x/talks/content/2015/gotham-grpc/search"
func main() { flag.Parse() // Connect to the server. conn, err := grpc.Dial(*server, grpc.WithInsecure()) if err != nil { log.Fatalf("fail to dial: %v", err) } defer conn.Close() client := pb.NewGoogleClient(conn) // Run the RPC. switch *mode { case "search": search(client, *query) case "watch": watch(client, *query) default: log.Fatalf("unknown mode: %q", *mode) } }
func search(client pb.GoogleClient, query string) { ctx, cancel := context.WithTimeout(context.Background(), 80*time.Millisecond) defer cancel() req := &pb.Request{Query: query} res, err := client.Search(ctx, req) if err != nil { log.Fatal(err) } fmt.Println(res) }
RPCs block but can be canceled using a Context.
gRPC propagates cancellation from client to server.
16lis, err := net.Listen("tcp", fmt.Sprintf(":%d", 36061+*index)) // RPC port if err != nil { log.Fatalf("failed to listen: %v", err) } g := grpc.NewServer() pb.RegisterGoogleServer(g, new(server)) g.Serve(lis)
new(server)
must implement the GoogleServer
interface:
type GoogleServer interface { // Search returns a Google search result for the query. Search(context.Context, *Request) (*Result, error) // Watch returns a stream of Google search results for the query. Watch(*Request, Google_WatchServer) error }
Each call to Search
or Watch
runs in its own goroutine.
ctx.Done
is closed when the RPC is canceled, times out, or returns:
func (s *server) Search(ctx context.Context, req *pb.Request) (*pb.Result, error) { d := randomDuration(100 * time.Millisecond) logSleep(ctx, d) select { case <-time.After(d): return &pb.Result{ Title: fmt.Sprintf("result for [%s] from backend %d", req.Query, *index), }, nil case <-ctx.Done(): return nil, ctx.Err() } }
If tracing is enabled, log the sleep duration:
func logSleep(ctx context.Context, d time.Duration) { if tr, ok := trace.FromContext(ctx); ok { tr.LazyPrintf("sleeping for %s", d) } }
Search
returns as soon as it gets the first result.
gRPC cancels the remaining backend.Search
RPCs by via ctx
:
func (s *server) Search(ctx context.Context, req *pb.Request) (*pb.Result, error) { c := make(chan result, len(s.backends)) for _, b := range s.backends { go func(backend pb.GoogleClient) { res, err := backend.Search(ctx, req) c <- result{res, err} }(b) } first := <-c return first.res, first.err }
type result struct { res *pb.Result err error }
syntax = "proto3"; service Google { // Search returns a Google search result for the query. rpc Search(Request) returns (Result) { } // Watch returns a stream of Google search results for the query. rpc Watch(Request) returns (stream Result) { } } message Request { string query = 1; } message Result { string title = 1; string url = 2; string snippet = 3; }
type GoogleClient interface { // Search returns a Google search result for the query. Search(ctx context.Context, in *Request, opts ...grpc.CallOption) (*Result, error) // Watch returns a stream of Google search results for the query. Watch(ctx context.Context, in *Request, opts ...grpc.CallOption) (Google_WatchClient, error) }
type GoogleServer interface { // Search returns a Google search result for the query. Search(context.Context, *Request) (*Result, error) // Watch returns a stream of Google search results for the query. Watch(*Request, Google_WatchServer) error }
type Google_WatchClient interface { Recv() (*Result, error) grpc.ClientStream }
type Google_WatchServer interface { Send(*Result) error grpc.ServerStream }
func watch(client pb.GoogleClient, query string) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() req := &pb.Request{Query: query} stream, err := client.Watch(ctx, req) if err != nil { log.Fatal(err) } for { res, err := stream.Recv() if err == io.EOF { fmt.Println("and now your watch is ended") return } if err != nil { log.Fatal(err) } fmt.Println(res) } }
func (s *server) Watch(req *pb.Request, stream pb.Google_WatchServer) error { ctx := stream.Context() for i := 0; ; i++ { d := randomDuration(1 * time.Second) logSleep(ctx, d) select { case <-time.After(d): err := stream.Send(&pb.Result{ Title: fmt.Sprintf("result %d for [%s] from backend %d", i, req.Query, *index), }) if err != nil { return err } case <-ctx.Done(): return ctx.Err() } } }
func (s *server) Watch(req *pb.Request, stream pb.Google_WatchServer) error { ctx := stream.Context() c := make(chan result) var wg sync.WaitGroup for _, b := range s.backends { wg.Add(1) go func(backend pb.GoogleClient) { defer wg.Done() watchBackend(ctx, backend, req, c) }(b) } go func() { wg.Wait() close(c) }() for res := range c { if res.err != nil { return res.err } if err := stream.Send(res.res); err != nil { return err } } return nil
Watch
returns on first error; this closes ctx.Done
and signals watchBackend
to exit.
func watchBackend(ctx context.Context, backend pb.GoogleClient, req *pb.Request, c chan<- result) { stream, err := backend.Watch(ctx, req) if err != nil { select { case c <- result{err: err}: case <-ctx.Done(): } return } for { res, err := stream.Recv() select { case c <- result{res, err}: if err != nil { return } case <-ctx.Done(): return } } }
Go gRPC works smoothly with goroutines, channels, and cancellation.
It is an excellent fit for building parallel, distributed, and streaming systems.
Thanks to Qi Zhao, David Symonds, Brad Fitzpatrick, and the rest.
Questions?
Sameer Ajmani
Tech Lead Manager, Go team, Google
@Sajma
sameer@golang.org