grpc调用主要流程分析(一)
grpc调用主要流程分析
客户端
0. 客户端调用
以github官网上的example为例跟踪调用的逻辑,总的调用过程基本就是分为三步:
- 创建connection
- 创建业务客户端实例
- 调用rpc接口
{
...
// 创建connection
conn, err := grpc.Dial(address, grpc.WithInsecure())
if err != nil {
log.Fatalf("did not connect: %v", err)
}
defer conn.Close()
// 创建client
c := pb.NewGreeterClient(conn)
// 调用RPC接口
name := defaultName
r, err := c.SayHello(context.TODO(), &pb.HelloRequest{Name: name})
if err != nil {
log.Fatalf("could not greet: %v", err)
}
...
}
1. 创建connection
通过grpc.Dial()接口创建了一个ClientConn类型实例。
Dial()函数的第一个参数作为endpoint,同时Dial()还接受变长参数DialOption。DialOption是一个接口类型,在grpc中存在着多种返回了DialOption类型的函数,这些返回来DialOption类型的函数,包括编解码,负载均衡策略等。一些函数的声明:
func WithBalancer() DialOption
func WithInsecure() DialOption
func WithCodec() DialOption
根据client的需求,调用方在调用Dial()的时候可以将这些函数作为参数传入Dial()中。在Dial()中,首先会根据参数进行一系列的初始化和赋值操作,而对于这些DailOption参数,最终在Dial()中实现对grpc.ClientConn的成员变量dopts中的CallOption进行了赋值。
所以,通过Dial()的调用,grpc已经建立了到服务端的链接,如果进行了配置,那同时也会附带一些诸如负载均衡、证书检查、backoff等策略的执行。
2. 创建客户端实例
创建业务client实例,在使用gRpc的时候,是用的协议是protobuf。而NewGreeterClient()
则是通过pb协议生成的代码接口,存在于helloworld.pb.go
中,该函数主要是返回了一个greeterClient类型的对象。
3. 调用RPC请求
SayHello()方法是存在于一个接口XXXServiceServer
,这个接口也是存在于根据pb协议生的的helloworld.pb.go
文件中。
SayHello()除了接受一个context存储上下文信息和一个request类型参数,同时也支持一个CallOption类型的变量。关于CallOption在上文中有提到,其本身也是一个接口,其中before()用于在请求发送之前设置参数,而after()则是在请求调用完毕之后提取信息。通过对这两个函数的调用,方便的实现了在请求前后的一些参数设置的功能:
type CallOption interface {
before(*callInfo) error
after(*callInfo)
}
任何一个我们我们上文说到了返回值为DialOption的函数,大部分都有一个对应的结构实现了CallOption接口,诸如上面的WithCodec(),其对应的结构为:
type CustomCodecCallOption struct {
Codec Codec
}
func (o CustomCodecCallOption) before(c *callInfo) error {
c.codec = o.Codec
return nil
}
func (o CustomCodecCallOption) after(c *callInfo) {}
回到SayHello()函数的逻辑中来,该函数最终会调用grpc中的call.go中的invoke函数来执行具体的操作。
在invoke()函数中,newClientStream()会首先获取传输层Trasport结构的实例并包装到一个ClientStream实例中返回,随后将RPC请求通过SendMsg()接口发送出去,注意,由于SendMsg()并不会等待服务端收到数据,因此还需要通过RecvMsg()同步接收收到的回复消息(关于SendMsg()和RecvMsg()中的具体发送和接收数据逻辑,不在赘述,可以去源码再详细了解)。
// pb.go文件
func (c *greeterClient) SayHello(ctx context.Context, in *HelloRequest, opts ...grpc.CallOption) (*HelloReply, error) {
out := new(HelloReply)
err := c.cc.Invoke(ctx, "/helloworld.Greeter/SayHello", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
...
// grpc/grpc.go/call.go文件
func invoke(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, opts ...CallOption) error {
cs, err := newClientStream(ctx, unaryStreamDesc, cc, method, opts...)
if err != nil {
return err
}
if err := cs.SendMsg(req); err != nil {
return err
}
return cs.RecvMsg(reply)
}
服务端
0. 创建服务
对于创建服务端,总的分为四步:
- 创建端口监听listener
- 创建server实例
- 注册服务(并未开始真正的服务)
- 启动服务端
{
...
// 创建listener
lis, err := net.Listen("tcp", port)
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
// 创建server示例
s := grpc.NewServer()
// 注册服务
pb.RegisterGreeterServer(s, &server{})
reflection.Register(s)
// 启动服务端监听
if err := s.Serve(lis); err != nil {
log.Fatalf("failed to serve: %v", err)
}
...
}
1. 创建监听端口
创建一个监听tcp端口的Listener实例。
2. 创建服务端实例
NewServer()方法创建了一个grpc.Server
实例,其函数内部会对该实例进行一系列初始化赋值操作,该接口与客户端中的Dial()接口类似,可以接受多个ServerOption入参。在helloworld的示例中并未传入任务参数,一个简单那的示例如下:
svr := grpc.NewServer(grpc.CustomCodec(proxy.Codec()))
在grpc中,也存在多种多种类似于CustomCodec()这样返回值类型为ServerOption的函数,从而满足调用方在需要求进行传参赋值:
func CustomCodec() ServerOption
func MaxConcurrentStreams() ServerOption
func UnknownServiceHandler() ServerOption
3. 服务注册
RegisterGreeterServer()是由proto文件生成的helloworld.pb.go文件里的一个对外暴露的函数,主要调用了grpc的RegisterService()
来注册当前service及其实现。
grpc.RegisterService()接收一个参数类型为ServiceDesc的实例_Greeter_serviceDesc
,这是用来对service进行描述的说明,比如服务名字,服务方法名和对应的方法的handler。同时接收一个service实例作注册进来。其中_Greeter_serviceDesc是由pb生成的对业务RPC接口的描述,如下所示:
// helloworld.pb.go
func RegisterGreeterServer(s *grpc.Server, srv GreeterServer) {
s.RegisterService(&_Greeter_serviceDesc, srv)
}
var _Greeter_serviceDesc = grpc.ServiceDesc{
ServiceName: "helloworld.Greeter",
HandlerType: (*GreeterServer)(nil),
Methods: []grpc.MethodDesc{
{
MethodName: "SayHello",
Handler: _Greeter_SayHello_Handler,
},
},
Streams: []grpc.StreamDesc{},
Metadata: "helloworld.proto",
}
可以看到,在grpc.ServiceDesc中对Methods变量进行了赋值。其中Methods包含了一个RPC接口名到handler的映射数组,描述了当前service支持的所有的方法,MethodName即为调用的RPC接口名,而handler的值_Greeter_SayHello_Handler()也是由pb生成的方法,在其内部通过注册进来的service实例,实现了对我们的业务函数SayHello()进行了调用:
func _Greeter_SayHello_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(HelloRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(GreeterServer).SayHello(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/helloworld.Greeter/SayHello",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(GreeterServer).SayHello(ctx, req.(*HelloRequest))
}
return interceptor(ctx, in, info, handler)
}
4. 启动服务
Server函数中开始接收来到Listener的请求(对listener进行Accept()),并为每一个请求创建一个go协程来服务。
Server函数的逻辑判断比较复杂,但是到最后都是一个死循环监听等待请求:
func (s *Server) Serve(lis net.Listener) error {
...
for {
// 开始接受服务
rawConn, err := lis.Accept()
...
// 为每一个请求启动一个go程来处理链接
s.serveWG.Add(1)
go func() {
s.handleRawConn(rawConn)
s.serveWG.Done()
}()
}
}
func (s *Server) handleRawConn(rawConn net.Conn) {
// 鉴权操作
conn, authInfo, err := s.useTransportAuthenticator(rawConn)
...
// 基于HTTP2,创建一个ServerTransport
st := s.newHTTP2Transport(conn, authInfo)
...
go func() {
s.serveStreams(st)
s.removeConn(st)
}()
}
其中,newHTTP2Transport()
的代码主要部分有一些关于HTTP2的赋值和初始化操作
,存在于internal/transport/http2_server.go
中。而serveStreams()
中则主要是调用了HandleStreams()
接口去真正的接受请求流。
func (s *Server) serveStreams(st transport.ServerTransport) {
defer st.Close()
var wg sync.WaitGroup
st.HandleStreams(func(stream *transport.Stream) {
wg.Add(1)
go func() {
defer wg.Done()
s.handleStream(st, stream, s.traceInfo(st, stream))
}()
}, func(ctx context.Context, method string) context.Context {
if !EnableTracing {
return ctx
}
tr := trace.New("grpc.Recv."+methodFamily(method), method)
return trace.NewContext(ctx, tr)
})
wg.Wait()
}
HandleStreams()
中的实现在grpc-go/internal/transport/handler_server.go
文件中。它的实现中前面一大部分是对数据流Stream的初始化,数据接收以及赋值。在数据流stream接收完毕后,通过注册进来的server的startStream()
来处理数据流。注册进来的startStream()最终调用了Server中的startStream()函数,区分出是unary请求还是stream请求,并分别通过processUnaryRPC()
和processStreamingRPC()
进行区分处理。对于两个主要的处理函数processUnaryRPC()和processStreamingRPC(),基本上是一些具体的数据接收、编解码等操作.
func (ht *serverHandlerTransport) HandleStreams(startStream func(*Stream), traceCtx func(context.Context, string) context.Context) {
...
// 数据流Stream的接受和赋值
startStream(s)
ht.runStream()
close(requestOver)
// 等待数据读取完毕
req.Body.Close()
<-readerDone
}
func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Stream, trInfo *traceInfo) {
...
// 判断Unary RPC还是Streaming RPC
if md, ok := srv.md[method]; ok {
s.processUnaryRPC(t, stream, srv, md, trInfo)
return
}
if sd, ok := srv.sd[method]; ok {
s.processStreamingRPC(t, stream, srv, sd, trInfo)
return
}
...
if unknownDesc := s.opts.unknownStreamDesc; unknownDesc != nil {
s.processStreamingRPC(t, stream, nil, unknownDesc, trInfo)
return
}
...
}
- 51 Nod 1057 N的阶乘【Java大数乱搞】
- 2017 Multi-University Training Contest - Team 1 1011&&HDU 6043 KazaQ's Socks【规律题,数学,水】
- 2017 Multi-University Training Contest - Team 1 1001&&HDU 6033 Add More Zero【签到题,数学,水】
- 51 Nod 1005 大数加法【Java大数乱搞,python大数乱搞】
- 51 Nod 1029 大数除法【Java大数乱搞】
- 51 Nod 1027 大数乘法【Java大数乱搞】
- SQL常用的基础语法
- 51 Nod 1028 大数乘法 V2【Java大数乱搞】
- Gym 100952J&&2015 HIAST Collegiate Programming Contest J. Polygons Intersection【计算几何求解两个凸多边形的相交面积板子题
- Windows下Cygwin可以使用哪些Linux命令
- Codeforces Round #426 (Div. 2)【A.枚举,B.思维,C,二分+数学】
- Cygwin,打造你的Windows下Linux环境
- “玲珑杯”ACM比赛 Round #19题解&源码【A,规律,B,二分,C,牛顿迭代法,D,平衡树,E,概率dp】
- 2017 Multi-University Training Contest - Team 1 1003&&HDU 6035 Colorful Tree【树形dp】
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- mysql提示Packet for query is too large (1142 > 1024)解决方案
- Kubernetes 1.19.0——deployment(1)
- 02 . Jeknins简介部署及自动化部署PHP代码
- java编程思想第四版第八章习题
- java编程思想第四版第八章总结
- ESP32从网络获取天气OLED显示(附源码)
- 02 . Zabbix配置监控项及聚合图形
- 01 . GitLab简介及环境部署
- 03 . Prometheus监控容器和HTTP探针应用及服务发现
- java编程思想第四版第九章习题
- 03 . Django之腾讯云短信
- ESP32 MQTT连接到中移OneNET物联网平台(附源码)
- 01 . Docker原理部署及常用操作命令
- SSH原理常见应用升级及端口转发
- 01 . Linux常用命令