GRPC 入门
gRPC
是 Google 基于 Protobuf
开发的跨语言的开源 RPC框架。基于 HTTP/2协议
设计,可以基于一个 HTTP/2
链接提供多个服务。
之前定义的接口
package service
const HelloServiceName = "HelloService"
type HelloService interface {
Hello(*Request, *Response) error
}
Request
和 Response
已经用 protobuf
定义了数据格式, 如果接口也能通过 protobuf
定义就完美了, 这也是 GRPC
真正的威力
GRPC技术栈
- 数据交互格式:
protobuf
- 通信方式: 最底层为
TCP
或Unix Socket
,之上是HTTP/2
的实现 - 核心库: 在
HTTP/2
上又构建了针对Go语言的gRPC核心库
Stub
: 程序通过gRPC插件生产的Stub代码和gRPC核心库通信,也可以直接和gRPC核心库通信
gRPC
采用 protobuf
描述接口和数据, 可以理解为: protobuf ON HTTP2
的RPC
Hello gRPC
演示一个基础的gRPC服务.目录结构
protobuf grpc插件
protobuf
不仅可以定义交互的数据结构(message), 还可以定义交互的接口:
service HelloService {
rpc Hello (String) returns (String);
}
从 Protobuf
的角度看,gRPC只不过是一个针对 service接口
生成代码的生成器。需要提前安装grpc的代码生成插件
# protoc-gen-go 插件之前已经安装
# go install google.golang.org/protobuf/cmd/protoc-gen-go@latest
# 安装protoc-gen-go-grpc插件
go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest
当前插件的版本
protoc-gen-go-grpc --version
protoc-gen-go-grpc 1.2.0
生成代码
protobuf 定义接口的语法:
service <service_name> {
rpc <function_name> (<request>) returns (<response>);
}
service:
用于申明这是个服务的接口service_name
: 服务/接口名称function_name
: 函数的名称request
: 函数参数response
: 函数返回
// hello/pb/hello.proto
syntax = "proto3";
package hello;
option go_package="micro/grpc/hello/pb";
service HelloService {
rpc Hello (Request) returns (Response);
}
message Request {
string value = 1;
}
message Response {
string value = 1;
}
生产代码, 同时制定gprc插件对应参数:
# 在hello文件夹
protoc -I=. --go_out=./pb --go_opt=module="micro/grpc/hello/pb" --go-grpc_out=./pb --go-grpc_opt=module="micro/grpc/hello/pb" pb/hello.proto
生成的代码: hello/pb/hello_grpc.pb.go
// 客户端
type HelloServiceClient interface {
Hello(ctx context.Context, in *Request, opts ...grpc.CallOption) (*Response, error)
}
// 服务端
type HelloServiceServer interface {
Hello(context.Context, *Request) (*Response, error)
mustEmbedUnimplementedHelloServiceServer()
}
gRPC服务端
基于服务端的 HelloServiceServer接口
可以重新实现 HelloService服务
首先构建一个服务实体,实现GRPC定义的接口
type HelloServiceServer struct{
// 嵌套UnimplementedHelloServiceServer, 用于向前兼容(框架)
// UnimplementedHelloServiceServer该对象有很多默认参数, 嵌套后才能实现继承
pb.UnimplementedHelloServiceServer
}
func (i *HelloServiceServer) Hello(ctx context.Context, req *pb.Request)(*pb.Response, error){
return &pb.Response{Value: fmt.Sprintf("hello, %s", req.Value)}, nil
}
由于 该接口 需要实现 must...
方法, 而 UnimplementedHelloServiceServer
实现了该方法,所以我们的服务实体需要嵌套该结构体
启动 gRPC
服务端:
func main(){
// 构造gRPC服务对象
grpcServer := grpc.NewServer()
// 注册HelloServiceImpl服务
pb.RegisterHelloServiceServer(grpcServer, new(HelloServiceServer))
lis, err := net.Listen("tcp", ":1234")
if err != nil {
log.Fatal(err)
}
// 通过grpcServer.Serve(lis)在一个监听端口上提供gRPC服务
grpcServer.Serve(lis)
}
gRPC客户端
func main() {
// grpc.Dial负责和gRPC服务建立链接
// no transport security set (use grpc.WithTransportCredentials(insecure.NewCredentials()) explicitly or set credentials)
// 我们需要设置运输安全(Dial 的opts参数) 即第二个参数
conn, err := grpc.Dial("localhost:1234", grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
log.Fatal(err)
}
defer conn.Close()
// 基于已经建立的链接构造HelloServiceClient对象,
// 返回HelloServiceClient接口对象
client := pb.NewHelloServiceClient(conn)
req := &pb.Request{Value: "hello"}
reply, err := client.Hello(context.Background(), req)
if err != nil {
log.Fatal(err)
}
fmt.Println(reply.GetValue())
}
gRPC流
演示目录
RPC
是远程函数调用,每次调用的参数和返回值不能太大,否则将严重影响每次调用的响应时间。因此传统的 RPC
调用对于上传和下载较大数据量场景并不适合。为此,gRPC框架
针对服务器端和客户端分别提供了流特性
在 HelloService
增加一个支持双向流的Channel方法
// pb/stream.proto
syntax = "proto3";
package hello;
option go_package="micro/grpc/stream/pb";
service HelloService {
// 双向数据流通道
rpc Channel(stream Request) returns(stream Response);
}
message Request {
string value = 1;
}
message Response {
string value = 1;
}
关键字 stream
指定启用流特性
定义 streaming RPC
的语法如下:
rpc <function_name> (stream <type>) returns (stream <type>) {}
生成Streaming RPC
重新生成RPC代码
protoc -I=. --go_out=./pb --go_opt=module="micro/grpc/stream/pb" --go-grpc_out=./pb --go-grpc_opt=module="micro/grpc/stream/pb" pb/stream.proto
接口的变化:
- 客户端的
Channel
方法返回HelloService_ChannelClient
,用于和服务端进行双向通信 - 服务端的
Channel
方法参数是HelloService_ChannelServer
,可以用于和客户端双向通信
HelloService_ChannelClient
和 HelloService_ChannelServer
接口定义:
// Request ----->
// Response <----
type HelloService_ChannelClient interface {
Send(*Request) error
Recv() (*Response, error)
grpc.ClientStream
}
// Request <----
// Reponse ---->
type HelloService_ChannelServer interface {
Send(*Response) error
Recv() (*Request, error)
grpc.ServerStream
}
服务端
server端逻辑:
- 接收一个Request
- 响应一个Response
type HelloService struct{
pb.UnimplementedHelloServiceServer
}
func (p *HelloService) Channel(stream pb.HelloService_ChannelServer) error {
// 循环接收客户端发来的数据
for {
// 接收一个请求
args, err := stream.Recv()
if err != nil {
// 客户端流被关闭
if err == io.EOF {
return nil
}
return err
}
// 响应请求,生成返回的数据通过流发送给客户端
resp := &pb.Response{Value: "hello:" + args.GetValue()}
err = stream.Send(resp)
if err != nil {
return err
}
}
}
双向流数据的发送和接收都是完全独立的行为。
发送和接收的操作并不需要一一对应,用户可以根据真实场景进行组织代码
客户端
func main() {
conn, err := grpc.Dial("localhost:1234",grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
log.Fatal(err)
}
defer conn.Close()
client := pb.NewHelloServiceClient(conn)
// 需要先调用Channel方法获取返回的流对象
stream, err := client.Channel(context.Background())
if err != nil {
log.Fatal(err)
}
// 将发送和接收操作放到两个独立的Goroutine。
// 首先是向服务端发送数据
go func() {
for {
if err := stream.Send(&pb.Request{Value: "hi"}); err != nil {
log.Fatal(err)
}
time.Sleep(time.Second)
}
}()
// 然后在循环中接收服务端返回的数据
for {
reply, err := stream.Recv()
if err != nil {
if err == io.EOF {
break
}
log.Fatal(err)
}
fmt.Println(reply.GetValue())
}
}
gRPC认证
前面的 gRPC服务
在无任何保护机制下可以被任何人调用, 很危险, 需要添加认证的功能
grpc有2种模式:
Request Response模式
Stream 模式
在grpc的认证体系中, 这2种认证是独立开的,可以思考下为什么?
接下来添加认证都在之前原有的 hello
和 stream
文件夹之下进行
Request Response认证
grpc框架预留的拦截器钩子:
原理
ctx
请求上下文req
rpc请求数据info
服务端相关数据, 不用理解这个handler
处理请求的handler, 相对于next()resp
rpc响应err
rpc 错误
gRPC
基于 HTTP2通讯
, 拦截器的作用原理和 HTTP
的中间件是一样的:
Server添加认证
- 添加一个中间件
grpc.UnaryServerInterceptor(mid1)
- 可以同时添加多个中间件
grpc.ChainUnaryInterceptor(mid1, mid2, mid3)
中间件
在 hello/server/main.go
里修改
func main(){
// 初始化全局Logger,为了更像实际运用我们进行日志的打印
//用的日志是这两个
//"github.com/infraboard/mcube/logger"
//"github.com/infraboard/mcube/logger/zap"
zap.DevelopmentSetup()
grpcServer := grpc.NewServer(
// 添加认证中间件,
grpc.ChainUnaryInterceptor(auth.GetUnaryServerInterceptor())
)
...
}
auther.GrpcAuthUnaryServerInterceptor()
为我们即将编写的中间件
编写中间件
基于此原理,我们来编写一个 Request Response
模式下的中间件 server/auth/auth.go
- 服务端认证中间件的实现
type ServerAuthInceptor struct{ log logger.Logger } func NewServerAuthInceptor() *ServerAuthInceptor{ return &ServerAuthInceptor{ log: zap.L().Named("Server.auth"), } }
- 认证中间件
const ( ClientHeaderKey = "client-id" ClientSecretKey = "client-secret" ) func (i *ServerAuthInceptor) Auth( ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler, ) (resp interface{}, err error) { i.log.Debugf("req: %s", req) i.log.Debugf("server info: server: %s, method: %s", info.Server, info.FullMethod) // grpc header, http2是有header, 这个header在ctx // 从上下文中获取认证信息, 这里的md 就是类似于header md, ok := metadata.FromIncomingContext(ctx) if !ok{ return nil, fmt.Errorf("ctx is not an grpc incoming context") } // 认证逻辑 cid, cs := i.GetClientCredentialsFromMeta(md) i.log.Debug(cid, cs) if cid != "admin" || cs != "123456" { return nil, grpc.Errorf(codes.Unauthenticated, "客户端调用凭证不正确") } // 请求路由到下一个 res, err := handler(ctx, req) i.log.Debugf("resp: %s", res) return res, err } func (i *ServerAuthInceptor) GetClientCredentialsFromMeta(md metadata.MD) (clientId, clientSecret string) { cids := md.Get(ClientHeaderKey) sids := md.Get(ClientSecretKey) if len(cids) > 0 { clientId = cids[0] } if len(sids) > 0 { clientSecret = sids[0] } return }
- 获取中间件
func GetUnaryServerInterceptor() grpc.UnaryServerInterceptor{ return NewServerAuthInceptor().Auth }
这时我们调用之前写好的客户端
Client携带认证(基础)
客户端每次发送 RPC
时都需要携带凭证:
client_id
client_secret
对照 http1.1
的认证逻辑(通过 Header
来传递), gRPC
也提供了类似的机制: metadata
把凭证放到 metadata
, 传递给服务端, 服务端从中取出校验。
下面是 我们的客户端调用方法: 注意我们可以传递很多: grpc.CallOption
type HelloServiceClient interface {
Hello(ctx context.Context, in *Request, opts ...grpc.CallOption) (*Response, error)
}
其中有一个 Option
的时候 就命名为 Header
type CallOption interface {
before(*callInfo) error
after(*callInfo, *csAttempt)
}
func Header(md *metadata.MD) CallOption {
return HeaderCallOption{HeaderAddr: md}
}
依赖这种机制, 可以通过 Header
传递额外的一些信息, 比如凭证
Client携带认证(改进)
如果每次都这么传递有点蠢, 因此设计了一种机制: WithPerRPCCredentials
, 每次调用都从获取凭证 注入到 metadata
中:
// WithPerRPCCredentials 返回一个 DialOption
// 该option在每个outbound 的RPC上 设置 credentials 和 auth 状态
func WithPerRPCCredentials(creds credentials.PerRPCCredentials) DialOption {
return newFuncDialOption(func(o *dialOptions) {
o.copts.PerRPCCredentials = append(o.copts.PerRPCCredentials, creds)
})
}
我们可以看到 PerRPCCredentials
是一个接口:
type PerRPCCredentials interface {
GetRequestMetadata(ctx context.Context, uri ...string) (map[string]string, error)
RequireTransportSecurity() bool
}
实现该接口, 就实现了客户端认证的携带机制:
package auth
import (
"context"
"micro/grpc/hello/server/auth"
)
type PerRPCCredentials struct{
clientId string
clientSecret string
}
func NewPerRPCCredentials(clientId, clientSecret string) *PerRPCCredentials {
return &PerRPCCredentials{
clientId: clientId,
clientSecret: clientSecret,
}
}
func (i *PerRPCCredentials) GetRequestMetadata(ctx context.Context, uri ...string)(map[string]string, error){
return map[string]string{
auth.ClientHeaderKey: i.clientId,
auth.ClientSecretKey: i.clientSecret,
}, nil
}
func (i *PerRPCCredentials) RequireTransportSecurity()bool{
return false
}
这样我们在建立grpc的连接的时候,就传递我们的Credential, 就实现了客户端携带凭证
conn, err := grpc.Dial(
"localhost:1234",
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithPerRPCCredentials(auth.NewPerRPCCredentials("admin", "123456")))
验证
最后我们需要进行验证
Stream 认证
stream
认证不同于 request reponse
认证模式, 因为建立的长连接,, 因此只需要在连接建立开始进行认证, 传递过程无需对每次交互做认证。
原理
和 HTTP
中间件一样, stream
模式下,gRPC
也提供了一个钩子:
srv
service信息ss
Server 的数据流info
服务端相关数据, 不用理解这个handler
处理请求的handler, 相当于next()err
rpc 错误
我们的任务就是实现一个这样的函数, 在函数中添加认证逻辑
Server添加认证
与上步相同 补充上 stream
的认证中间件
func main(){
// 初始全局日志
zap.DevelopmentSetup()
// 首先是通过grpc.NewServer()构造一个gRPC服务对象
grpcServer := grpc.NewServer(
grpc.ChainStreamInterceptor(auth.NewStreamServerAuthInterceptor().Auth),
)
...
编写中间件
stream/server/auth/auth.go
就Auth
函数不同,其他相同
func (i *StreamServerAuthInterceptor) Auth(
srv interface{},
ss grpc.ServerStream,
info *grpc.StreamServerInfo,
handler grpc.StreamHandler,
) error{
ctx := ss.Context() // 还是要获取 context
md, ok := metadata.FromIncomingContext(ctx)
if !ok{
return fmt.Errorf("ctx is not an grpc incoming context")
}
cid, cs := i.GetClientCredentialsFromMeta(md)
if cid != "admin" || cs != "123456" {
return grpc.Errorf(codes.Unauthenticated, "客户端调用凭证不正确")
}
return handler(srv, ss)
}
客户端携带认证
客户端提供凭证的逻辑和 Request Reponse
模式一样
验证
最后我们进行验证