Contents

gRPC(Go)教程(九)---配置retry自动重试

本文主要记录了如何使用 gRPC 中的 自动重试功能。

1. 概述

gRPC 系列相关代码见 Github

gRPC 中已经内置了 retry 功能,可以直接使用,不需要我们手动来实现,非常方便。

2. Demo

Server

为了测试 retry 功能,服务端做了一点调整。

记录客户端的请求次数,只有满足条件的那一次(这里就是请求次数模4等于0的那一次)才返回成功,其他时候都返回失败。

package main

import (
	"context"
	"flag"
	"fmt"
	"log"
	"net"
	"sync"

	"google.golang.org/grpc"
	"google.golang.org/grpc/codes"
	"google.golang.org/grpc/status"

	pb "github.com/lixd/grpc-go-example/features/proto/echo"
)

var port = flag.Int("port", 50052, "port number")

type failingServer struct {
	pb.UnimplementedEchoServer
	mu sync.Mutex

	reqCounter uint
	reqModulo  uint
}

// maybeFailRequest 手动模拟请求失败 一共请求n次,前n-1次都返回失败,最后一次返回成功。
func (s *failingServer) maybeFailRequest() error {
	s.mu.Lock()
	defer s.mu.Unlock()
	s.reqCounter++
	if (s.reqModulo > 0) && (s.reqCounter%s.reqModulo == 0) {
		return nil
	}

	return status.Errorf(codes.Unavailable, "maybeFailRequest: failing it")
}

func (s *failingServer) UnaryEcho(ctx context.Context, req *pb.EchoRequest) (*pb.EchoResponse, error) {
	if err := s.maybeFailRequest(); err != nil {
		log.Println("request failed count:", s.reqCounter)
		return nil, err
	}

	log.Println("request succeeded count:", s.reqCounter)
	return &pb.EchoResponse{Message: req.Message}, nil
}

func main() {
	flag.Parse()

	address := fmt.Sprintf(":%v", *port)
	lis, err := net.Listen("tcp", address)
	if err != nil {
		log.Fatalf("failed to listen: %v", err)
	}
	fmt.Println("listen on address", address)

	s := grpc.NewServer()

	// 指定第4次请求才返回成功,用于测试 gRPC 的 retry 功能。
	failingservice := &failingServer{
		reqModulo: 4,
	}

	pb.RegisterEchoServer(s, failingservice)
	if err := s.Serve(lis); err != nil {
		log.Fatalf("failed to serve: %v", err)
	}
}

Client

客户端则是建立连接的时候通过grpc.WithDefaultServiceConfig()配置好 retry 功能。

package main

import (
	"context"
	"flag"
	"log"
	"time"

	pb "github.com/lixd/grpc-go-example/features/proto/echo"
	"google.golang.org/grpc"
)

var (
	addr = flag.String("addr", "localhost:50052", "the address to connect to")
	// 更多配置信息查看官方文档: https://github.com/grpc/grpc/blob/master/doc/service_config.md
	// service这里语法为<package>.<service> package就是proto文件中指定的package,service也是proto文件中指定的 Service Name。
	// method 可以不指定 即当前service下的所以方法都使用该配置。
	retryPolicy = `{
		"methodConfig": [{
		  "name": [{"service": "echo.Echo","method":"UnaryEcho"}],
		  "retryPolicy": {
			  "MaxAttempts": 4,
			  "InitialBackoff": ".01s",
			  "MaxBackoff": ".01s",
			  "BackoffMultiplier": 1.0,
			  "RetryableStatusCodes": [ "UNAVAILABLE" ]
		  }
		}]}`
)

func main() {
	flag.Parse()
	conn, err := grpc.Dial(*addr, grpc.WithInsecure(), grpc.WithDefaultServiceConfig(retryPolicy))
	if err != nil {
		log.Fatalf("did not connect: %v", err)
	}
	defer func() {
		if e := conn.Close(); e != nil {
			log.Printf("failed to close connection: %s", e)
		}
	}()

	c := pb.NewEchoClient(conn)

	ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
	defer cancel()

	reply, err := c.UnaryEcho(ctx, &pb.EchoRequest{Message: "Try and Success"})
	if err != nil {
		log.Fatalf("UnaryEcho error: %v", err)
	}
	log.Printf("UnaryEcho reply: %v", reply)
}

配置信息具体含义这里先不解释,在后续章节有详细说明。

Run

先启动服务端

lixd@17x:~/17x/projects/grpc-go-example/features/retry/server$ go run main.go 
listen on address :50052
2021/02/17 17:35:29 request failed count: 1

在启动客户端

lixd@17x:~/17x/projects/grpc-go-example/features/retry/client$ go run main.go 
2021/02/17 17:35:29 UnaryEcho error: rpc error: code = Unavailable desc = maybeFailRequest: failing it
exit status 1

emmm 并没有重试。。。

开启重试

查看文档后发现是需要设置环境变量(GRPC_GO_RETRY=on)

因为是客户端的重试功能,所以是在客户端设置环境变量,服务端则不需要。

lixd@17x$ export GRPC_GO_RETRY=on
lixd@17x$ echo $GRPC_GO_RETRY
on

然后重新启动服务端和客户端

lixd@17x:~/17x/projects/grpc-go-example/features/retry/server$ go run main.go 
listen on address :50052
2021/02/17 17:37:55 request failed count: 1
2021/02/17 17:37:55 request failed count: 2
2021/02/17 17:37:55 request failed count: 3
2021/02/17 17:37:55 request succeeded count: 4
lixd@17x:~/17x/projects/grpc-go-example/features/retry/client$ go run main.go 
2021/02/17 17:37:55 UnaryEcho reply: message:"Try and Success"

现在重试功能就生效了。

前3次都失败了,且返回错误码是客户端重试策略中指定的UNAVAILABLE,所以都进行了重试,直到第4次成功了就不在重试了。

当然,第4次已经是最大尝试次数了,就算失败也不会重试了。

3. 配置

Service Config 是以 JSON 格式配置的,具体文档见 service_config.md

service_config.proto

当前支持的配置信息由 service_config.proto 文件定义,详细信息可以参考该文件,这里贴一部分仅供参考:

// Configuration for a method.
message MethodConfig {
  // The names of the methods to which this configuration applies.
  // - MethodConfig without names (empty list) will be skipped.
  // - Each name entry must be unique across the entire ServiceConfig.
  // - If the 'method' field is empty, this MethodConfig specifies the defaults
  //   for all methods for the specified service.
  // - If the 'service' field is empty, the 'method' field must be empty, and
  //   this MethodConfig specifies the default for all methods (it's the default
  //   config).
  //
  // When determining which MethodConfig to use for a given RPC, the most
  // specific match wins. For example, let's say that the service config
  // contains the following MethodConfig entries:
  //
  // method_config { name { } ... }
  // method_config { name { service: "MyService" } ... }
  // method_config { name { service: "MyService" method: "Foo" } ... }
  //
  // MyService/Foo will use the third entry, because it exactly matches the
  // service and method name. MyService/Bar will use the second entry, because
  // it provides the default for all methods of MyService. AnotherService/Baz
  // will use the first entry, because it doesn't match the other two.
  //
  // In JSON representation, value "", value `null`, and not present are the
  // same. The following are the same Name:
  // - { "service": "s" }
  // - { "service": "s", "method": null }
  // - { "service": "s", "method": "" }
  message Name {
    string service = 1;  // Required. Includes proto package name.
    string method = 2;
  }
  repeated Name name = 1;

  // Whether RPCs sent to this method should wait until the connection is
  // ready by default. If false, the RPC will abort immediately if there is
  // a transient failure connecting to the server. Otherwise, gRPC will
  // attempt to connect until the deadline is exceeded.
  //
  // The value specified via the gRPC client API will override the value
  // set here. However, note that setting the value in the client API will
  // also affect transient errors encountered during name resolution, which
  // cannot be caught by the value here, since the service config is
  // obtained by the gRPC client via name resolution.
  google.protobuf.BoolValue wait_for_ready = 2;

  // The default timeout in seconds for RPCs sent to this method. This can be
  // overridden in code. If no reply is received in the specified amount of
  // time, the request is aborted and a DEADLINE_EXCEEDED error status
  // is returned to the caller.
  //
  // The actual deadline used will be the minimum of the value specified here
  // and the value set by the application via the gRPC client API.  If either
  // one is not set, then the other will be used.  If neither is set, then the
  // request has no deadline.
  google.protobuf.Duration timeout = 3;

  // The maximum allowed payload size for an individual request or object in a
  // stream (client->server) in bytes. The size which is measured is the
  // serialized payload after per-message compression (but before stream
  // compression) in bytes. This applies both to streaming and non-streaming
  // requests.
  //
  // The actual value used is the minimum of the value specified here and the
  // value set by the application via the gRPC client API.  If either one is
  // not set, then the other will be used.  If neither is set, then the
  // built-in default is used.
  //
  // If a client attempts to send an object larger than this value, it will not
  // be sent and the client will see a ClientError.
  // Note that 0 is a valid value, meaning that the request message
  // must be empty.
  google.protobuf.UInt32Value max_request_message_bytes = 4;

  // The maximum allowed payload size for an individual response or object in a
  // stream (server->client) in bytes. The size which is measured is the
  // serialized payload after per-message compression (but before stream
  // compression) in bytes. This applies both to streaming and non-streaming
  // requests.
  //
  // The actual value used is the minimum of the value specified here and the
  // value set by the application via the gRPC client API.  If either one is
  // not set, then the other will be used.  If neither is set, then the
  // built-in default is used.
  //
  // If a server attempts to send an object larger than this value, it will not
  // be sent, and a ServerError will be sent to the client instead.
  // Note that 0 is a valid value, meaning that the response message
  // must be empty.
  google.protobuf.UInt32Value max_response_message_bytes = 5;

  // The retry policy for outgoing RPCs.
  message RetryPolicy {
    // The maximum number of RPC attempts, including the original attempt.
    //
    // This field is required and must be greater than 1.
    // Any value greater than 5 will be treated as if it were 5.
    uint32 max_attempts = 1;

    // Exponential backoff parameters. The initial retry attempt will occur at
    // random(0, initial_backoff). In general, the nth attempt will occur at
    // random(0,
    //   min(initial_backoff*backoff_multiplier**(n-1), max_backoff)).
    // Required. Must be greater than zero.
    google.protobuf.Duration initial_backoff = 2;
    // Required. Must be greater than zero.
    google.protobuf.Duration max_backoff = 3;
    float backoff_multiplier = 4;  // Required. Must be greater than zero.

    // The set of status codes which may be retried.
    //
    // This field is required and must be non-empty.
    repeated google.rpc.Code retryable_status_codes = 5;
  }
    // The hedging policy for outgoing RPCs. Hedged RPCs may execute more than
  // once on the server, so only idempotent methods should specify a hedging
  // policy.
  message HedgingPolicy {
    // The hedging policy will send up to max_requests RPCs.
    // This number represents the total number of all attempts, including
    // the original attempt.
    //
    // This field is required and must be greater than 1.
    // Any value greater than 5 will be treated as if it were 5.
    uint32 max_attempts = 1;

    // The first RPC will be sent immediately, but the max_requests-1 subsequent
    // hedged RPCs will be sent at intervals of every hedging_delay. Set this
    // to 0 to immediately send all max_requests RPCs.
    google.protobuf.Duration hedging_delay = 2;

    // The set of status codes which indicate other hedged RPCs may still
    // succeed. If a non-fatal status code is returned by the server, hedged
    // RPCs will continue. Otherwise, outstanding requests will be canceled and
    // the error returned to the client application layer.
    //
    // This field is optional.
    repeated google.rpc.Code non_fatal_status_codes = 3;
  }

  // Only one of retry_policy or hedging_policy may be set. If neither is set,
  // RPCs will not be retried or hedged.
  oneof retry_or_hedging_policy {
    RetryPolicy retry_policy = 6;
    HedgingPolicy hedging_policy = 7;
  }
}

注释写的还是很详细的,转换成 JSON 如下:

{
		"methodConfig": [{
		  "name": [{"service": "echo.Echo","method":"UnaryEcho"}],
          "wait_for_ready": false,
          "timeout": 1000ms,
          "max_request_message_bytes": 1024,
          "max_response_message_bytes": 1024,
		  "retryPolicy": {
			  "maxAttempts": 4,
			  "initialBackoff": ".01s",
			  "maxBackoff": ".01s",
			  "backoffMultiplier": 1.0,
			  "retryableStatusCodes": [ "UNAVAILABLE" ]
		  },
		  "hedgingPolicy":{
              "maxAttempts":4,
              "hedgingDelay":"0.1s",
              "nonFatalStatusCodes": [ "" ]
          }}]
}

首先是 Name,通过 service + method 指定当前配置要应用到哪些服务和方法。

"name": [{"service": "echo.Echo","method":"UnaryEcho"}],

然后后续几个都不是重点:

          "wait_for_ready": false,
          "timeout": 1000ms,
          "max_request_message_bytes": 1024,
          "max_response_message_bytes": 1024,

重点是下面这两个配置,重试策略和对冲策略:

		  "retryPolicy": {
			  "maxAttempts": 4,
			  "initialBackoff": ".01s",
			  "maxBackoff": ".01s",
			  "backoffMultiplier": 1.0,
			  "retryableStatusCodes": [ "UNAVAILABLE" ]
		  },
		  "hedgingPolicy":{
              "maxAttempts":4,
              "hedgingDelay":"0.1s",
              "nonFatalStatusCodes": [ "UNAVAILABLE" ]
          }

gRPC 的重试策略有两种分别是 重试(retryPolicy)对冲(hedging),一个RPC方法只能配置一种重试策略。

对冲是指在不等待响应的情况主动发送单次调用的多个请求,如果一个方法使用对冲策略,那么首先会像正常的 RPC 调用一样发送第一次请求,如果 hedgingDelay 时间内没有响应,那么直接发送第二次请求,以此类推,直到发送了 maxAttempts 次。

对冲在超过指定时间没有响应就会直接发起请求,而重试则必须要服务端响应后才会发起请求。

注意: 使用对冲的时候,请求可能会访问到不同的后端(如果设置了负载均衡),那么就要求方法在多次执行下是安全,并且符合预期的

retry config

demo 中的配置信息如下:

{
		"methodConfig": [{
		  "name": [{"service": "echo.Echo","method":"UnaryEcho"}],
		  "retryPolicy": {
			  "MaxAttempts": 4,
			  "InitialBackoff": ".01s",
			  "MaxBackoff": ".01s",
			  "BackoffMultiplier": 1.0,
			  "RetryableStatusCodes": [ "UNAVAILABLE" ]
		  }}]
}
  • name 指定下面的配置信息作用的 RPC 服务或方法
    • service:通过服务名匹配,语法为<package>.<service> package就是proto文件中指定的package,service也是proto文件中指定的 Service Name。
    • method:匹配具体某个方法,proto文件中定义的方法名。

主要关注 retryPolicy,重试策略

  • MaxAttempts:最大尝试次数
  • InitialBackoff:默认退避时间
  • MaxBackoff:最大退避时间
  • BackoffMultiplier:退避时间增加倍率
  • RetryableStatusCodes:服务端返回什么错误码才重试

重试机制一般会搭配退避算法一起使用。

即假设第一次请求失败后,等1秒(随便取的一个数)再次请求,又失败后就等2秒在请求,一直重试直达超过指定重试次数或者等待时间就不在重试。

如果不使用退避算法,失败后就一直重试只会增加服务器的压力。如果是因为服务器压力大,导致的请求失败,那么根据退避算法等待一定时间后再次请求可能就能成功。反之直接请求可能会因为压力过大导致服务崩溃。

  • 第一次重试间隔是 random(0, initialBackoff)
  • 第 n 次的重试间隔为 random(0, min( initialBackoff*backoffMultiplier**(n-1) , maxBackoff))

4. 小结

gRPC 中内置了 retry 功能,使用比较简单。

  • 1)客户端建立连接时通过grpc.WithDefaultServiceConfig(retryPolicy)指定重试策略
  • 2)环境变量中开启重试:export GRPC_GO_RETRY=on