gRPC Server Streamingでメタ情報も一緒にレスポンスする

キャンプ場を検索・予約できるサービス(以下hinata-spot)の開発を担当している名嘉眞です。

hinata-spotではbackendにGoを、backend間の通信にgRPCを採用しています。

今回はgRPCのServer Streamingでメタデータも一緒にレスポンスする方法をブログにします。

どんな時に使うのか

あまり多くはないかもしれないですが、Server Streamingで通信したいユースケースで、かつそのユースケースで使うメタデータなどもレスポンスしたい場合です。

例えば、1つ1つが容量の大きなデータを複数レスポンスする必要があり、そのデータ総数などのメタデータも送信したい場合などになります。

より具体的な例で以下のようなユースケースで考えてみます。

  • 予約できるプラン(1プランで多くの項目の情報が設定されていると仮定)のリストを返す。

  • またそのプランの総数も必要になる。

上記のユースケースだと、proto定義は以下のようになるかも知れません。

rpc GetPlans(GetPlansRequest) returns (stream GetPlansResponse);

message GetPlansRequest {
  string check_in_date   = 1;
  string check_out_date  = 2;
}

message GetPlansResponse {
  Plan  plan       = 1;
  int64 total_size = 2;
}

message Plan {
  string id          = 1;  // ID
  string name        = 2;  // プラン名
  string description = 3;  // プラン詳細
  // そのほか多くのフィールドがあるとします
}

rpc GetPlansはServer Steamingで定義しています。レスポンスのGetPlansResponseには、Plan型とtotal_sizeが定義されています。 Plan型はstreamingで通信されるので複数回レスポンスされますが、その都度プラン総数もレスポンスすることになってしまいます。

また他のパターンだとプラン総数をレスポンスするrpcを定義するという方法も考えられます。 この場合クライアント側は2つのrpcをcallする必要があります。

rpc GetPlans(GetPlansRequest) returns (stream GetPlansResponse);

// プラン総数をレスポンスするrpc
rpc GetPlansCount(GetPlansRequest) returns (GetPlansCountResponse);

上記のような状況の場合oneofを使うと以下のように定義することがができます。

oneofには以下のように記載されています。

Oneofフィールドは、oneof内のすべてのフィールドがメモリを共有しており、同時に設定できるフィールドが1つだけであることを除けば、通常のフィールドと同じです。

つまりoneofを使って宣言したmessage型はその内部のうちどれか1つのフィールドを返すということを定義できます。 今回のユースケースでoneofを使ってみた場合が以下です。 oneof PlanInfoは、planかtotal_sizeのどちらかをレスポンスします。

rpc GetPlans(GetPlansRequest) returns (stream GetPlansResponse);

message GetPlansRequest {
  string check_in_date   = 1;
  string check_out_date  = 2;
}

message GetPlansResponse {
  oneof PlanInfo {
    Plan  plan       = 1;
    int64 total_size = 2;
  }
}

message Plan {
  string id          = 1;  // ID
  string name        = 2;  // プラン名
  string description = 3;  // プラン詳細
  // そのほか多くのフィールドがあるとします
}

gRPC Server側の実装は以下のようになります。 DBなどからプランを取得してきたあと、まずプラン総数をSendし、その後forでループしながらプランをSendします。

func (s *backendService) GetPlans(req *spot.GetPlansRequest, stream spot.BackendService_GetPlansServer) error {
    params := &usecase.ListSpotPlanParams{
        SpotID: req.GetSpotId(),
    }

  // DBなどからプランを取得してくると仮定
    plans, err := s.usecase.ListPlanUseCase.ListSpotPlan(stream.Context(), params)
    if err != nil {
        return handleError(err)
    }

  // プランの総数を先にSendする
    err = stream.Send(&spot.GetPlansResponse{
        PlanInfo: &spot.GetPlansResponse_TotalSize{
            TotalSize: int64(len(plans)),
        },
    })
    if err != nil {
        return handleError(err)
    }

    for _, plan := range plans {
        err = stream.Send(&spot.GetPlansResponse{
            PlanInfo: &spot.GetPlansResponse_Plan{
                Plan: s.convertPlan(plan), // protoで定義したPlan messageに変換してレスポンスに設定
            },
        })
        if err != nil {
            return handleError(err)
        }
    }

    return nil
}

実際のクライアント側でレスポンスを受け取るコードが以下のようになります。 まずプラン総数を取得します。

その後forでプランを取得していきます。

stream, err := backendService.GetPlans(ctx, &spot.GetPlansRequest{
  // 省略
})
if err != nil {
    return nil, err
}

resp, err := stream.Recv()
if err != nil {
    return nil, err
}

// 取得したプラン総数でスライスのcapを指定するにしている
res := make([]*model.Plan, 0, resp.GetTotalSize())

for {
    // 受信データ量が多いとgRPC転送量エラーになるので、streamで一件ずつ受け取る
    p, err := stream.Recv()
    if err == io.EOF {
        break
    }
    if err != nil {
        return nil, err
    }

    res = append(res, &model.Plan{
        Plan: p.GetPlan(),
    })
}

return res, nil

oneofを使うことで1回のrpc callで異なる種類のデータをそれぞれ取得することができます。 少しスマートな感じがしますね。 また、今後レスポンスにフィールド追加を行いたい場合でもoneof型で宣言したフィールドに追加することで解決できます。

最後に

gRPC Server Streamingでメタデータも一緒にレスポンスする方法について記載しました。誰かの役に立てば幸いです。

vivit では一緒に働くエンジニアを大募集しています 🎉

www.wantedly.com

少しでも興味を持って頂いた方は、是非カジュアル面談の応募をお待ちしております!