gRPC实践案例

类似Google这种大公司产出的产品,一般就两种情况,一种是面向小白用户的,如此让G粉簇拥而来找Bug优化产品思路,等到时机成熟再推出正式版或者取消产品,AngularJS就是这一类。还有一种就是Google自己的需求而总结出来的产品,优点什么的我就不吹了,gRPC就是这一类的。

序言

今年年初的时候我就一直想做一款基于RPC实现组件化搭建网站的一款产品。目的就是为了让各种语言的程序员能以最低沟通、学习代价来进行快速、稳定开发产品。而当初做的时候为了速度摸坑,用了Nodejs来进行开发,做出了GQ这款产品,自己边用边总结,说真的坑是真的多,JSON是不够用的,还有bytes、流数据等等问题。几个月下来,发现这个东西是个史诗大坑,因为要考虑到各种语言的兼容、使用难易、不同类型组件(数据库组件、流文件处理组件等)通用接口等等问题,迟迟没有拿出一套跨语言的规范来。几乎要放弃。这项目被我搁置。 不巧,gRPC出来了。当初心想有搞头,但是当时文档不够健全,让那些爱折腾的人先去探探路吧。 现在我作为第二批吃螃蟹的人,上手一试,心中暗叹:厉害了我的Goggle

案例简介

一套基于路由注册的分发服务。这里使用Nodejs来快速上手。 服务端:注册HTTP端口,以及gRPC基础服务,通过基础服务,可以注册HTTP请求的处理权。 子服务:注册基础服务,实现对HTTP请求的处理。 流程如下:

HTTP请求--->服务端--/a/b-->子服务1
         |--/a/c-->子服务2
         +--/a/d-->子服务3

实现流程

PS:我这里不赘述XX为什么要这以做之类的话语,直接上代码,并简单解释代码中的重点。这个案例没有难点。 gRPC的安装省略。 使用Nodejs版本v6.9.1。用到一些ES6的语法,不完全兼容v4.*

proto文件

syntax = "proto3";

option objc_class_prefix = "Gaubee";

package httpserver;

// HttpServer服务的定义
service HttpServer {
    rpc BindRoute(RouteInfo) returns(RequestInfo) {}
    rpc ReturnResponse(ResponInfo) returns(ResponReply) {}
}

message RouteInfo {
    enum Method {
        GET = 0;
        POST = 1;
        PUT = 2;
        DELETE = 3;
    }
    // 注册的路由pathname
    string path = 1;
    Method method = 2;
    // 是否直接解析query到query中
    bool is_parse_query = 3;
    // 是否解析带有Body的请求
    bool is_parse_body = 4;
}
message RequestInfo {
    // 请求的ID,在做响应的时候需要带上
    string require_id = 1;
    // 请求链接中的数据
    map<string, string> query = 2;
    // 请求数据包中的数据
    map<string, string> body = 3;
}
message ResponInfo{
    string require_id = 1;
    map<string, string> response_head = 2;
    string response_data = 3;
    int32 statusCode = 4;
}
// 返回整个请求的时间信息
message ResponReply{
    // 收到请求,解析请求query、body花费的时间
    float route_parsed_time = 1;
    // 传输请求数据花费的时间
    float route_send_time = 2;
    // 处理请求至服务端收到数据花费的时间
    float responsed_time = 3;
    // 返回请求结果给客户花费的时间
    float responsed_send_time = 4;
}

以上的proto文件描述了两个接口:

  • BindRoute:实现路由绑定把要绑定的路由写道path中,这里为了简单,我只是进行了字符串相等匹配。BindRoute的回调函数不会马上触发。要等到服务端有HTTP请求的时候,匹配到对应的路由,最后再执行返回。
  • ReturnResponse:BindRoute收到回调后,意味着子服务开始处理HTTP请求,在处理完成后,需要调用此函数来返回处理结果。

require_id是注册基础服务后生成的一次性编号,每一次注册都会生成一个,在收到HTTP请求是,匹配到对应的路由,就要通过这个require_id来发送请求、接受处理结果。

服务端

"use strict";
// PROTO文件路径,按需换成自己的文件路径
const PROTO_PATH = __dirname + "/../pingpong/http-server.proto";
const grpc = require("grpc");
const http = require("http");
const url = require("url");
const querystring = require("querystring");

// 动态加载PROTO文件,生成接口
const httpserver = grpc.load(PROTO_PATH).httpserver;

/** 路由映射表
 *  结构为:{ [METHOD:GET|POST|PUT|DELETE] : { [PATHNAME] : Array<String>[require_id,...] } }
 */
const route_res_list = {};

/** require_id对应的路由注册信息与回调函数
 *  结构为:{ args : 路由注册信息, callback: 回调函数,用于通知子服务来处理HTTP请求 }
 */
const handleCache = new Map();
/** require_id对应的HTTP请求
 *  结构为:{ req : HTTP.Request, res: HTTP.Response }
 */
const reqresCache = new Map();
// 初始化路由映射表的结构
for (let method in httpserver.RouteInfo.Method) {
  route_res_list[method] = {};
}

/** rgRPC服务:绑定路由
 *
 */
function BindRoute(call, callback) {
  const args = call.request;
  const routes = route_res_list[args.method];
  var handles = routes[args.path];
  if (!handles) {
    handles = routes[args.path] = new Set();
  }
  // 生成require_id
  const require_id = Math.random().toString(36).substr(2);

  // 加入路由映射表
  handles.add(require_id);

  // 这里不调用callback,而是等待到收到HTTP请求的时候再执行
  handleCache.set(require_id, {
    args: args,
    callback,
  });
}

/** rgRPC服务:响应HTTP请求
 *
 */
function ReturnResponse(call, callback) {
  const args = call.request;
  const require_id = args.require_id;
  // 取出缓存中的req、res对象
  const { req, res } = reqresCache.get(require_id);
  if (!res) {
    return callback(`Ref Error:require_id:${require_id} no ref.`);
  }
  // 返回数据
  res.writeHead(args.statusCode, args.response_head);
  res.end(args.response_data);
  // 清除缓存
  reqresCache.delete(require_id);

  // 返回整个流程的消耗时间,这里为了简化代码没有加统计时间的代码
  callback(null, {
    route_parsed_time: 0,
    route_send_time: 0,
    responsed_time: 0,
    responsed_send_time: 0,
  });
}

function main() {
  const grpc_server = new grpc.Server();
  grpc_server.addProtoService(httpserver.HttpServer.service, {
    bindRoute: BindRoute,
    returnResponse: ReturnResponse,
  });
  // 注册gRPC服务
  grpc_server.bind("0.0.0.0:50051", grpc.ServerCredentials.createInsecure());
  grpc_server.start();
}

main();
// 注册HTTP服务
const server = http.createServer((req, res) => {
  const routes = route_res_list[req.method];
  if (routes) {
    const url_info = url.parse(req.url);
    const handles = routes[url_info.pathname];
    if (handles && handles.size) {
      const { value: require_id, has_key } = handles.values().next();
      if (has_key) {
        console.log("no handle");
        return;
      }
      const handle = handleCache.get(require_id);
      // 通知客户端处理HTTP请求,并带上HTTP请求的一些数据
      handle.callback(null, {
        require_id,
        // TODO:headers
        query: handle.args.is_parse_query
          ? Object.assign({}, querystring.parse(url_info.query))
          : {},
        // TODO:body应该改用流数据传输
        body:
          handle.args.method !== "GET" && handle.args.is_parse_body ? {} : {},
      });
      // 缓存req、res
      reqresCache.set(require_id, {
        req,
        res,
      });
      // 清除缓存,此require_id已经报废,只用于最后的响应res的处理
      handleCache.delete(require_id);
      handles.delete(require_id);
    }
  }
});
server.listen(1337, "0.0.0.0", () => {});

子服务

const PROTO_PATH = __dirname + "/../pingpong/http-server.proto";
const grpc = require("grpc");
const http = require("http");
const httpserver = grpc.load(PROTO_PATH).httpserver;
const address = "localhost:50051";

function main() {
  var client = new httpserver.HttpServer(
    address,
    grpc.credentials.createInsecure(),
  );
  // 注册路由
  client.bindRoute(
    {
      path: "/QAQ",
      method: httpserver.RouteInfo.Method.GET,
      is_parse_query: true,
      is_parse_body: false,
    },
    function (err, response) {
      // 收到路由请求
      console.log(err, response);
      // 返回路由处理结果
      client.returnResponse(
        {
          require_id: response.require_id,
          statusCode: 200,
          response_head: {},
          response_data: JSON.stringify(response.query),
        },
        function (err, response) {
          // 收到性能统计
          console.log(err, response);
        },
      );
    },
  );

  // 模拟发起请求
  setTimeout(function () {
    var req = http.get("http://localhost:1337/QAQ?a=a", (res, socket, head) => {
      var datas = [];
      res.on("data", (chunk) => {
        datas.push(chunk);
      });
      res.on("end", () => {
        // 打印结果
        console.log("request res:", Buffer.concat(datas).toString());
      });
    });
  }, 200);
}

main();

总结

以上代码流程只是简单的示例,有一点不好的就是路由的注册变成了一次性,这不能说完全不对。但更优的做法应该改为流数据,这就不需要调用callback来结束服务,而是可以通过流不停地发起请求,流的实践参考官方代码:route_guide