Contents

gRPC(Go)教程(三)---Stream 推送流

本文主要讲述了 gRPC 中的四种类型的方法使用,包括普通的 Unary API 和三种 Stream API:ServerStreamingClientStreamingBidirectionalStreaming

1. 概述

gRPC 系列相关代码见 Github

gRPC 中的 Service API 有如下4种类型:

  • 1)UnaryAPI:普通一元方法
  • 2)ServerStreaming:服务端推送流
  • 3)ClientStreaming:客户端推送流
  • 4)BidirectionalStreaming:双向推送流

Unary API 就是普通的 RPC 调用,例如之前的 HelloWorld 就是一个 Unary API,本文主要讲解 Stream API。

Stream 顾名思义就是一种流,可以源源不断的推送数据,很适合大数据传输,或者服务端和客户端长时间数据交互的场景。Stream API 和 Unary API 相比,因为省掉了中间每次建立连接的花费,所以效率上会提升一些。

2. proto 文件定义

echo.proto文件定义如下:

syntax = "proto3";

option go_package = "github.com/lixd/grpc-go-example/features/proto/echo";

package echo;


// Echo 服务,包含了4种类型API
service Echo {
  // UnaryAPI
  rpc UnaryEcho(EchoRequest) returns (EchoResponse) {}
  // SServerStreaming
  rpc ServerStreamingEcho(EchoRequest) returns (stream EchoResponse) {}
  // ClientStreamingE
  rpc ClientStreamingEcho(stream EchoRequest) returns (EchoResponse) {}
  // BidirectionalStreaming
  rpc BidirectionalStreamingEcho(stream EchoRequest) returns (stream EchoResponse) {}
}

message EchoRequest {
  string message = 1;
}

message EchoResponse {
  string message = 1;
}

编译

lixd@17x:~/17x/projects/grpc-go-example/features/proto/echo$ protoc --go_out=. --go_opt=paths=source_relative \
--go-grpc_out=. --go-grpc_opt=paths=source_relative \
./echo.proto

3. UnaryAPI

比较简单,没有需要特别注意的地方。

Server

type Echo struct {
	pb.UnimplementedEchoServer
}

// UnaryEcho 一个普通的UnaryAPI
func (e *Echo) UnaryEcho(ctx context.Context, req *pb.EchoRequest) (*pb.EchoResponse, error) {
	log.Printf("Recved: %v", req.GetMessage())
	resp := &pb.EchoResponse{Message: req.GetMessage()}
	return resp, nil
}

Client

func main() {
	// 1.建立连接 获取client
	conn, err := grpc.Dial(address, grpc.WithInsecure(), grpc.WithBlock())
	if err != nil {
		log.Fatalf("did not connect: %v", err)
	}
	defer conn.Close()
	client := pb.NewEchoClient(conn)
	// 2.执行各个Stream的对应方法
	unary(client)
}
func unary(client pb.EchoClient) {
	resp, err := client.UnaryEcho(context.Background(), &pb.EchoRequest{Message: "hello world"})
	if err != nil {
		log.Printf("send error:%v\n", err)
	}
	fmt.Printf("Recved:%v \n", resp.GetMessage())
}

Test

Server

lixd@17x:~/17x/projects/grpc-go-example/features/stream/server$ go run main.go 
2021/01/23 19:00:30 Serving gRPC on 0.0.0.0:50051
2021/01/23 19:00:36 Recved: hello world

Client

lixd@17x:~/17x/projects/grpc-go-example/features/stream/client$ go run main.go 
Recved:hello world

4. ServerStream

服务端流:服务端可以发送多个数据给客户端。

使用场景: 例如图片处理的时候,客户端提供一张原图,服务端依次返回多个处理后的图片。

Server

注意点:可以多次调用 stream.Send() 来返回多个数据。

//  ServerStreamingEcho 客户端发送一个请求 服务端以流的形式循环发送多个响应
/*
1. 获取客户端请求参数
2. 处理完成后返回过个响应
3. 最后返回nil表示已经完成响应
*/
func (e *Echo) ServerStreamingEcho(req *pb.EchoRequest, stream pb.Echo_ServerStreamingEchoServer) error {
	log.Printf("Recved %v", req.GetMessage())
    // 具体返回多少个response根据业务逻辑调整
	for i := 0; i < 2; i++ {
		// 通过 send 方法不断推送数据
		err := stream.Send(&pb.EchoResponse{Message: req.GetMessage()})
		if err != nil {
			log.Fatalf("Send error:%v", err)
			return err
		}
	}
	// 返回nil表示已经完成响应
	return nil
}

Client

注意点:调用方法获取到的不是单纯的响应,而是一个 stream。通过 stream.Recv() 接收服务端的多个返回值。

/*
1. 建立连接 获取client
2. 通过 client 获取stream
3. for循环中通过stream.Recv()依次获取服务端推送的消息
4. err==io.EOF则表示服务端关闭stream了
*/
func serverStream(client pb.EchoClient) {
	// 2.调用获取stream
	stream, err := client.ServerStreamingEcho(context.Background(), &pb.EchoRequest{Message: "Hello World"})
	if err != nil {
		log.Fatalf("could not echo: %v", err)
	}

	// 3. for循环获取服务端推送的消息
	for {
		// 通过 Recv() 不断获取服务端send()推送的消息
		resp, err := stream.Recv()
		// 4. err==io.EOF则表示服务端关闭stream了 退出
		if err == io.EOF {
			log.Println("server closed")
			break
		}
		if err != nil {
			log.Printf("Recv error:%v", err)
			continue
		}
		log.Printf("Recv data:%v", resp.GetMessage())
	}
}

Test

Server

lixd@17x:~/17x/projects/grpc-go-example/features/stream/server$ go run main.go 
2021/01/23 19:06:18 Serving gRPC on 0.0.0.0:50051
2021/01/23 19:06:27 Recved Hello World

Client

lixd@17x:~/17x/projects/grpc-go-example/features/stream/client$ go run main.go 
2021/01/23 19:06:27 Recv data:Hello World
2021/01/23 19:06:27 Recv data:Hello World
2021/01/23 19:06:27 server closed

5. ClientStream

和 ServerStream 相反,客户端可以发送多个数据。

Server

注意点:循环调用 stream.Recv() 获取数据,err == io.EOF 则表示数据已经全部获取了,最后通过 stream.SendAndClose() 返回响应。

// ClientStreamingEcho 客户端流
/*
1. for循环中通过stream.Recv()不断接收client传来的数据
2. err == io.EOF表示客户端已经发送完毕关闭连接了,此时在等待服务端处理完并返回消息
3. stream.SendAndClose() 发送消息并关闭连接(虽然在客户端流里服务器这边并不需要关闭 但是方法还是叫的这个名字,内部也只会调用Send())
*/
func (e *Echo) ClientStreamingEcho(stream pb.Echo_ClientStreamingEchoServer) error {
	// 1.for循环接收客户端发送的消息
	for {
		// 2. 通过 Recv() 不断获取客户端 send()推送的消息
		req, err := stream.Recv() // Recv内部也是调用RecvMsg
		// 3. err == io.EOF表示已经获取全部数据
		if err == io.EOF {
			log.Println("client closed")
			// 4.SendAndClose 返回并关闭连接
			// 在客户端发送完毕后服务端即可返回响应
			return stream.SendAndClose(&pb.EchoResponse{Message: "ok"})
		}
		if err != nil {
			return err
		}
		log.Printf("Recved %v", req.GetMessage())
	}
}

Client

注意点:通过多次调用 stream.Send() 像服务端推送多个数据,最后调用 stream.CloseAndRecv() 关闭stream并接收服务端响应。

// clientStream 客户端流
/*
1. 建立连接并获取client
2. 获取 stream 并通过 Send 方法不断推送数据到服务端
3. 发送完成后通过stream.CloseAndRecv() 关闭steam并接收服务端返回结果
*/
func clientStream(client pb.EchoClient) {
	// 2.获取 stream 并通过 Send 方法不断推送数据到服务端
	stream, err := client.ClientStreamingEcho(context.Background())
	if err != nil {
		log.Fatalf("Sum() error: %v", err)
	}
	for i := int64(0); i < 2; i++ {
		err := stream.Send(&pb.EchoRequest{Message: "hello world"})
		if err != nil {
			log.Printf("send error: %v", err)
			continue
		}
	}

	// 3. 发送完成后通过stream.CloseAndRecv() 关闭steam并接收服务端返回结果
	// (服务端则根据err==io.EOF来判断client是否关闭stream)
	resp, err := stream.CloseAndRecv()
	if err != nil {
		log.Fatalf("CloseAndRecv() error: %v", err)
	}
	log.Printf("sum: %v", resp.GetMessage())

}

Test

Server

lixd@17x:~/17x/projects/grpc-go-example/features/stream/server$ go run main.go 
2021/01/23 19:10:34 Serving gRPC on 0.0.0.0:50051
2021/01/23 19:10:42 Recved hello world
2021/01/23 19:10:42 Recved hello world
2021/01/23 19:10:42 client closed

Cliet

lixd@17x:~/17x/projects/grpc-go-example/features/stream/client$ go run main.go 
2021/01/23 19:10:42 sum: ok

6. BidirectionalStream

双向推送流则可以看做是结合了 ServerStream 和 ClientStream 二者。客户端和服务端都可以向对方推送多个数据。

注意:服务端和客户端的这两个Stream 是独立的。

Server

注意点:一般是使用两个 Goroutine,一个接收数据,一个推送数据。最后通过 return nil 表示已经完成响应。

// BidirectionalStreamingEcho 双向流服务端
/*
// 1. 建立连接 获取client
// 2. 通过client调用方法获取stream
// 3. 开两个goroutine(使用 chan 传递数据) 分别用于Recv()和Send()
// 3.1 一直Recv()到err==io.EOF(即客户端关闭stream)
// 3.2 Send()则自己控制什么时候Close 服务端stream没有close 只要跳出循环就算close了。 具体见https://github.com/grpc/grpc-go/issues/444
*/
func (e *Echo) BidirectionalStreamingEcho(stream pb.Echo_BidirectionalStreamingEchoServer) error {
	var (
		waitGroup sync.WaitGroup
		msgCh     = make(chan string)
	)
	waitGroup.Add(1)
	go func() {
		defer waitGroup.Done()

		for v := range msgCh {
			err := stream.Send(&pb.EchoResponse{Message: v})
			if err != nil {
				fmt.Println("Send error:", err)
				continue
			}
		}
	}()

	waitGroup.Add(1)
	go func() {
		defer waitGroup.Done()
		for {
			req, err := stream.Recv()
			if err == io.EOF {
				break
			}
			if err != nil {
				log.Fatalf("recv error:%v", err)
			}
			fmt.Printf("Recved :%v \n", req.GetMessage())
			msgCh <- req.GetMessage()
		}
		close(msgCh)
	}()
	waitGroup.Wait()

	// 返回nil表示已经完成响应
	return nil
}

Client

注意点:和服务端类似,不过客户端推送结束后需要主动调用 stream.CloseSend() 函数来关闭Stream。

// bidirectionalStream 双向流
/*
1. 建立连接 获取client
2. 通过client获取stream
3. 开两个goroutine 分别用于Recv()和Send()
	3.1 一直Recv()到err==io.EOF(即服务端关闭stream)
	3.2 Send()则由自己控制
4. 发送完毕调用 stream.CloseSend()关闭stream 必须调用关闭 否则Server会一直尝试接收数据 一直报错...
*/
func bidirectionalStream(client pb.EchoClient) {
	var wg sync.WaitGroup
	// 2. 调用方法获取stream
	stream, err := client.BidirectionalStreamingEcho(context.Background())
	if err != nil {
		panic(err)
	}
	// 3.开两个goroutine 分别用于Recv()和Send()
	wg.Add(1)
	go func() {
		defer wg.Done()
		for {
			req, err := stream.Recv()
			if err == io.EOF {
				fmt.Println("Server Closed")
				break
			}
			if err != nil {
				continue
			}
			fmt.Printf("Recv Data:%v \n", req.GetMessage())
		}
	}()

	wg.Add(1)
	go func() {
		defer wg.Done()

		for i := 0; i < 2; i++ {
			err := stream.Send(&pb.EchoRequest{Message: "hello world"})
			if err != nil {
				log.Printf("send error:%v\n", err)
			}
			time.Sleep(time.Second)
		}
		// 4. 发送完毕关闭stream
		err := stream.CloseSend()
		if err != nil {
			log.Printf("Send error:%v\n", err)
			return
		}
	}()
	wg.Wait()
}

Test

Server

lixd@17x:~/17x/projects/grpc-go-example/features/stream/server$ go run main.go 
2021/01/23 19:14:56 Serving gRPC on 0.0.0.0:50051
Recved :hello world 
Recved :hello world 

Client

lixd@17x:~/17x/projects/grpc-go-example/features/stream/client$ go run main.go 
Recved:hello world 
Recved:hello world 
Server Closed

7. 小结

客户端或者服务端都有对应的 推送 或者 接收对象,我们只要 不断循环 Recv(),或者 Send() 就能接收或者推送了!

gRPC Stream 和 goroutine 配合简直完美。通过 Stream 我们可以更加灵活的实现自己的业务。如 订阅,大数据传输等。

Client发送完成后需要手动调用Close()或者CloseSend()方法关闭stream,Server端则return nil就会自动 Close

1)ServerStream

  • 服务端处理完成后return nil代表响应完成
  • 客户端通过 err == io.EOF判断服务端是否响应完成

2)ClientStream

  • 客户端发送完毕通过`CloseAndRecv关闭stream 并接收服务端响应
  • 服务端通过 err == io.EOF判断客户端是否发送完毕,完毕后使用SendAndClose关闭 stream并返回响应。

3)BidirectionalStream

  • 客户端服务端都通过stream向对方推送数据
  • 客户端推送完成后通过CloseSend关闭流,通过err == io.EOF`判断服务端是否响应完成
  • 服务端通过err == io.EOF判断客户端是否响应完成,通过return nil表示已经完成响应

通过err == io.EOF来判定是否把对方推送的数据全部获取到了。

客户端通过CloseAndRecv或者CloseSend关闭 Stream,服务端则通过SendAndClose或者直接 return nil来返回响应。

gRPC 系列相关代码见 Github

8. 参考

https://grpc.io/docs/languages/go/basics/#server-side-streaming-rpc

https://github.com/grpc/grpc-go