Grpc 源码学习
grpc 是谷歌推出的一个高性能、开源和通用的 RPC 框架,基于 HTTP2,使用 Protocol Buffer 来定义消息结构。
架构
定义消息结构
syntax = "proto3"; package pb; //定义一个Chunker服务 service Chunker { //提供一个Chunker给外部调用 rpc Chunker(Empty) returns (stream Chunk) {} } message Empty{} message Chunk { bytes chunk = 1; }
服务端
//服务提供者 type chunkerSrv []byte //定义一个类型这个类型必须实现Chunker函数 func (c chunkerSrv) Chunker(_ *pb.Empty, srv pb.Chunker_ChunkerServer) error { //........ } func main() { listen, err := net.Listen("tcp", ":8888") if err != nil { log.Fatal(err) } s := grpc.NewServer() blob := make([]byte, 1024*1024*1024*2) //2G rand.Read(blob) pb.RegisterChunkerServer(s, chunkerSrv(blob)) //注册服务 log.Println("serving on localhost:8888") log.Fatal(s.Serve(listen)) }
GRPC服务注册过程
服务注册主要是这个函数 pb.RegisterChunkerServer(s, chunkerSrv(blob))
这个函数是 proto 文件自动生成的
// chunkerSrv类型实现了ChunkerServer这个接口 type ChunkerServer interface { //proto自动生成 Chunker(*Empty, Chunker_ChunkerServer) error } func RegisterChunkerServer(s *grpc.Server, srv ChunkerServer) { s.RegisterService(&_Chunker_serviceDesc, srv) //35行 } // 一个rpc服务的定义类 var _Chunker_serviceDesc = grpc.ServiceDesc{ ServiceName: "pb.Chunker", //proto文件中定义的包名.服务名 HandlerType: (*ChunkerServer)(nil), //这个服务函数提供者的类型 Methods: []grpc.MethodDesc{}, //服务提供的方法集合 如果不是流式传输会定义在这里 Streams: []grpc.StreamDesc{ //流式传输定义在这里 { StreamName: "Chunker", //函数名 Handler: _Chunker_Chunker_Handler, //处理函数 ServerStreams: true, // 服务端流式 RPC }, }, Metadata: "chunker.proto", //元数据 } type StreamHandler func(srv interface{}, stream ServerStream) error // 流式rpc服务定义 type StreamDesc struct { //函数名 StreamName string Handler StreamHandler // 函数处理器 ServerStreams bool // 服务端流式 RPC ClientStreams bool // 客户端流式 RPC } // 服务端调用的处理函数 func _Chunker_Chunker_Handler(srv interface{}, stream grpc.ServerStream) error { m := new(Empty) if err := stream.RecvMsg(m); err != nil { return err } //srv是我们注册的chunkerSrv类型,调用他的Chunker方法来供请求使用 return srv.(ChunkerServer).Chunker(m, &chunkerChunkerServer{stream}) } // 注册的逻辑 func (s *Server) RegisterService(sd *ServiceDesc, ss interface{}) { //判断服务提供者和定义好的服务是不是同一类型 if ss != nil { ht := reflect.TypeOf(sd.HandlerType).Elem() st := reflect.TypeOf(ss) if !st.Implements(ht) { logger.Fatalf("grpc: Server.RegisterService found the handler of type %v that does not satisfy %v", st, ht) } } //进行注册 s.register(sd, ss) } func (s *Server) register(sd *ServiceDesc, ss interface{}) { s.mu.Lock() defer s.mu.Unlock() s.printf("RegisterService(%q)", sd.ServiceName) if s.serve { logger.Fatalf("grpc: Server.RegisterService after Server.Serve for %q", sd.ServiceName) } if _, ok := s.services[sd.ServiceName]; ok { logger.Fatalf("grpc: Server.RegisterService found duplicate service registration for %q", sd.ServiceName) } info := &serviceInfo{ //一个服务的对象 serviceImpl: ss, //服务的具体实现类 methods: make(map[string]*MethodDesc), //普通传输的方法 streams: make(map[string]*StreamDesc), //流式传输方法 mdata: sd.Metadata, } for i := range sd.Methods { d := &sd.Methods[i] info.methods[d.MethodName] = d } for i := range sd.Streams { d := &sd.Streams[i] info.streams[d.StreamName] = d } s.services[sd.ServiceName] = info //完成注册 }
监听请求
通过 s.Serve(listen)
方法来监听请求
func (s *Server) Serve(lis net.Listener) error { //......略 for { rawConn, err := lis.Accept() if err != nil { //......略 //如果发生错误,执行重试 } go func() { s.handleRawConn(lis.Addr().String(), rawConn) //处理请求 s.serveWG.Done() }() } } func (s *Server) handleRawConn(lisAddr string, rawConn net.Conn) { //........略 // 使用http2传输 st := s.newHTTP2Transport(conn, authInfo) if st == nil { return } rawConn.SetDeadline(time.Time{}) if !s.addConn(lisAddr, st) { return } go func() { s.serveStreams(st) s.removeConn(lisAddr, st) }() } func (s *Server) serveStreams(st transport.ServerTransport) { defer st.Close() var wg sync.WaitGroup var roundRobinCounter uint32 st.HandleStreams(func(stream *transport.Stream) { wg.Add(1) if s.opts.numServerWorkers > 0 { data := &serverWorkerData{st: st, wg: &wg, stream: stream} select { case s.serverWorkerChannels[atomic.AddUint32(&roundRobinCounter, 1)%s.opts.numServerWorkers] <- data: default: // If all stream workers are busy, fallback to the default code path. go func() { s.handleStream(st, stream, s.traceInfo(st, stream)) //真正的处理逻辑 wg.Done() }() } } else { go func() { defer wg.Done() s.handleStream(st, stream, s.traceInfo(st, stream)) //真正的处理逻辑 }() } }, func(ctx context.Context, method string) context.Context { if !EnableTracing { return ctx } tr := trace.New("grpc.Recv."+methodFamily(method), method) return trace.NewContext(ctx, tr) }) wg.Wait() }
处理请求
//处理逻辑 func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Stream, trInfo *traceInfo) { sm := stream.Method() if sm != "" && sm[0] == '/' { sm = sm[1:] } //.....略 service := sm[:pos] //从stream里面获取请求的服务 method := sm[pos+1:] //请求的方法名 srv, knownService := s.services[service] if knownService { //如果服务在服务器中注册了 if md, ok := srv.methods[method]; ok { s.processUnaryRPC(t, stream, srv, md, trInfo) //执行普通rpc return } if sd, ok := srv.streams[method]; ok { s.processStreamingRPC(t, stream, srv, sd, trInfo) //执行流式rpc return } } //。。。。略 如果此服务没有注册的异常处理 } // 执行流式rpc func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transport.Stream, info *serviceInfo, sd *StreamDesc, trInfo *traceInfo) (err error) { //.....略 一些前置处理 ss := &serverStream{ ctx: ctx, t: t, s: stream, p: &parser{r: stream}, codec: s.getCodec(stream.ContentSubtype()), maxReceiveMessageSize: s.opts.maxReceiveMessageSize, //配置最大接收消息限制 maxSendMessageSize: s.opts.maxSendMessageSize, //配置最大发送消息限制 trInfo: trInfo, statsHandler: sh, } var appErr error var server interface{} if info != nil { server = info.serviceImpl //这个就是我们注册好的函数提供者 } if s.opts.streamInt == nil { appErr = sd.Handler(server, ss) //执行Handler函数 也是我们传进来的 } else { info := &StreamServerInfo{ FullMethod: stream.Method(), IsClientStream: sd.ClientStreams, IsServerStream: sd.ServerStreams, } appErr = s.opts.streamInt(server, ss, info, sd.Handler) } //..... 略 返回值和错误处理 return err }
客户端
以 eventti
请求调用 cmdb
的节点搜索 rpc 服务为例
syntax = "proto3"; package rpc; service NodeService { rpc SearchNode(ByJqlAndUsernameRequest) returns (common.SimpleJsonResponse) {}; }
// GrpcClient grpc客户端 type GrpcClient struct { Conn *grpc.ClientConn } // NewGrpcClient 新建grpc客户端 func NewGrpcClient() (*GrpcClient, error) { // 连接服务端接口 conn, err := grpc.Dial(common.AppConfigInstance.GwayCfg.Address, grpc.WithUserAgent("sky-eventti"), grpc.WithInsecure()) if err != nil { logs.Error(err) return nil, err } res := &GrpcClient{ Conn: conn, } return res, nil }
// SearchEventTypeMapping cmdb查询所有事件类型 func SearchEventTypeMapping(gc *GrpcClient) ([]models.EventTypeVO, error) { //....... 略 client := rpc.NewNodeServiceClient(gc.Conn) defer gc.Close() reply, err := client.SearchNode(context.Background(), &rpc.ByJqlAndUsernameRequest{ Jql: "label = eventTypeMapping", }) if err != nil { logs.Error(err) return nil, err } jsonStr := reply.GetJsonStr() //.......略 } // proto生成的函数 func (c *nodeServiceClient) SearchNode(ctx context.Context, in *ByJqlAndUsernameRequest, opts ...grpc.CallOption) (*common.SimpleJsonResponse, error) { out := new(common.SimpleJsonResponse) err := c.cc.Invoke(ctx, "/rpc.NodeService/SearchNode", in, out, opts...) if err != nil { return nil, err } return out, nil } // grpc连接的抽象接口 (grpc.ClientConn实现了它) type ClientConnInterface interface { // Invoke performs a unary RPC and returns after the response is received // into reply. Invoke(ctx context.Context, method string, args interface{}, reply interface{}, opts ...CallOption) error // NewStream begins a streaming RPC. NewStream(ctx context.Context, desc *StreamDesc, method string, opts ...CallOption) (ClientStream, error) } func (cc *ClientConn) Invoke(ctx context.Context, method string, args, reply interface{}, opts ...CallOption) error { // 合并拦截器参数与请求参数 opts = combine(cc.dopts.callOptions, opts) // 拦截器不为空,调用拦截器 if cc.dopts.unaryInt != nil { return cc.dopts.unaryInt(ctx, method, args, reply, cc, invoke, opts...) } // 发起调用 return invoke(ctx, method, args, reply, cc, opts...) } func invoke(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, opts ...CallOption) error { // 创建客户端流 cs, err := newClientStream(ctx, unaryStreamDesc, cc, method, opts...) if err != nil { return err } // 发送数据 if err := cs.SendMsg(req); err != nil { return err } // 接收数据 return cs.RecvMsg(reply) }
以上就是 grpc 从服务注册然后到请求服务的整个流程。
👍
大G出品,必属精品
★★★★★