gRPC实践案例

类似Google这种大公司产出的产品,一般就两种情况,一种是面向小白用户的,如此让G粉簇拥而来找Bug优化产品思路,等到时机成熟再推出正式版或者取消产品,AngularJS就是这一类。还有一种就是Google自己的需求而总结出来的产品,优点什么的我就不吹了,gRPC就是这一类的。

序言

今年年初的时候我就一直想做一款基于RPC实现组件化搭建网站的一款产品。目的就是为了让各种语言的程序员能以最低沟通、学习代价来进行快速、稳定开发产品。而当初做的时候为了速度摸坑,用了Nodejs来进行开发,做出了GQ这款产品,自己边用边总结,说真的坑是真的多,JSON是不够用的,还有bytes、流数据等等问题。几个月下来,发现这个东西是个史诗大坑,因为要考虑到各种语言的兼容、使用难易、不同类型组件(数据库组件、流文件处理组件等)通用接口等等问题,迟迟没有拿出一套跨语言的规范来。几乎要放弃。这项目被我搁置。 不巧,gRPC出来了。当初心想有搞头,但是当时文档不够健全,让那些爱折腾的人先去探探路吧。 现在我作为第二批吃螃蟹的人,上手一试,心中暗叹:厉害了我的Goggle

案例简介

一套基于路由注册的分发服务。这里使用Nodejs来快速上手。 服务端:注册HTTP端口,以及gRPC基础服务,通过基础服务,可以注册HTTP请求的处理权。 子服务:注册基础服务,实现对HTTP请求的处理。 流程如下:

HTTP请求--->服务端--/a/b-->子服务1
         |--/a/c-->子服务2
         +--/a/d-->子服务3

实现流程

PS:我这里不赘述XX为什么要这以做之类的话语,直接上代码,并简单解释代码中的重点。这个案例没有难点。 gRPC的安装省略。 使用Nodejs版本v6.9.1。用到一些ES6的语法,不完全兼容v4.*

proto文件

syntax = "proto3";

option objc_class_prefix = "Gaubee";

package httpserver;

// HttpServer服务的定义
service HttpServer {
    rpc BindRoute(RouteInfo) returns(RequestInfo) {}
    rpc ReturnResponse(ResponInfo) returns(ResponReply) {}
}

message RouteInfo {
    enum Method {
        GET = 0;
        POST = 1;
        PUT = 2;
        DELETE = 3;
    }
    // 注册的路由pathname
    string path = 1;
    Method method = 2;
    // 是否直接解析query到query中
    bool is_parse_query = 3;
    // 是否解析带有Body的请求
    bool is_parse_body = 4;
}
message RequestInfo {
    // 请求的ID,在做响应的时候需要带上
    string require_id = 1;
    // 请求链接中的数据
    map<string, string> query = 2;
    // 请求数据包中的数据
    map<string, string> body = 3;
}
message ResponInfo{
    string require_id = 1;
    map<string, string> response_head = 2;
    string response_data = 3;
    int32 statusCode = 4;
}
// 返回整个请求的时间信息
message ResponReply{
    // 收到请求,解析请求query、body花费的时间
    float route_parsed_time = 1;
    // 传输请求数据花费的时间
    float route_send_time = 2;
    // 处理请求至服务端收到数据花费的时间
    float responsed_time = 3;
    // 返回请求结果给客户花费的时间
    float responsed_send_time = 4;
}

以上的proto文件描述了两个接口:

require_id是注册基础服务后生成的一次性编号,每一次注册都会生成一个,在收到HTTP请求是,匹配到对应的路由,就要通过这个require_id来发送请求、接受处理结果。

服务端

"use strict";
// PROTO文件路径,按需换成自己的文件路径
const PROTO_PATH = __dirname + '/../pingpong/http-server.proto';
const grpc = require('grpc');
const http = require("http");
const url = require("url");
const querystring = require("querystring");

// 动态加载PROTO文件,生成接口
const httpserver = grpc.load(PROTO_PATH).httpserver;

/** 路由映射表
 *  结构为:{ [METHOD:GET|POST|PUT|DELETE] : { [PATHNAME] : Array<String>[require_id,...] } }
 */
const route_res_list = {};

/** require_id对应的路由注册信息与回调函数
 *  结构为:{ args : 路由注册信息, callback: 回调函数,用于通知子服务来处理HTTP请求 }
 */
const handleCache = new Map();
/** require_id对应的HTTP请求
 *  结构为:{ req : HTTP.Request, res: HTTP.Response }
 */
const reqresCache = new Map();
// 初始化路由映射表的结构
for (let method in httpserver.RouteInfo.Method) {
    route_res_list[method] = {};
}

/** rgRPC服务:绑定路由
 *
 */
function BindRoute(call, callback) {
    const args = call.request;
    const routes = route_res_list[args.method];
    var handles = routes[args.path];
    if (!handles) {
        handles = routes[args.path] = new Set();
    }
    // 生成require_id
    const require_id = Math.random().toString(36).substr(2);

    // 加入路由映射表
    handles.add(require_id);

    // 这里不调用callback,而是等待到收到HTTP请求的时候再执行
    handleCache.set(require_id, {
        args: args,
        callback
    });
}

/** rgRPC服务:响应HTTP请求
 *
 */
function ReturnResponse(call, callback) {
    const args = call.request;
    const require_id = args.require_id;
    // 取出缓存中的req、res对象
    const {
        req,
        res
    } = reqresCache.get(require_id);
    if (!res) {
        return callback(`Ref Error:require_id:${require_id} no ref.`);
    }
    // 返回数据
    res.writeHead(args.statusCode, args.response_head);
    res.end(args.response_data);
    // 清除缓存
    reqresCache.delete(require_id);

    // 返回整个流程的消耗时间,这里为了简化代码没有加统计时间的代码
    callback(null, {
        route_parsed_time: 0,
        route_send_time: 0,
        responsed_time: 0,
        responsed_send_time: 0,
    });
}

function main() {
    const grpc_server = new grpc.Server();
    grpc_server.addProtoService(httpserver.HttpServer.service, {
        bindRoute: BindRoute,
        returnResponse: ReturnResponse,
    });
    // 注册gRPC服务
    grpc_server.bind('0.0.0.0:50051', grpc.ServerCredentials.createInsecure());
    grpc_server.start();
}

main();
// 注册HTTP服务
const server = http.createServer((req, res) => {
    const routes = route_res_list[req.method];
    if (routes) {
        const url_info = url.parse(req.url);
        const handles = routes[url_info.pathname];
        if (handles && handles.size) {
            const {
                value: require_id,
                has_key
            } = handles.values().next();
            if (has_key) {
                console.log("no handle");
                return
            }
            const handle = handleCache.get(require_id);
            // 通知客户端处理HTTP请求,并带上HTTP请求的一些数据
            handle.callback(null, {
                require_id,
                // TODO:headers
                query: handle.args.is_parse_query ? Object.assign({}, querystring.parse(url_info.query)) : {},
                // TODO:body应该改用流数据传输
                body: (handle.args.method !== "GET" && handle.args.is_parse_body) ? {} : {}
            });
            // 缓存req、res
            reqresCache.set(require_id, {
                req,
                res
            });
            // 清除缓存,此require_id已经报废,只用于最后的响应res的处理
            handleCache.delete(require_id);
            handles.delete(require_id);
        }
    }
});
server.listen(1337, "0.0.0.0", () => {});

子服务

const PROTO_PATH = __dirname + '/../pingpong/http-server.proto';
const grpc = require('grpc');
const http = require("http");
const httpserver = grpc.load(PROTO_PATH).httpserver;
const address = 'localhost:50051'

function main() {
    var client = new httpserver.HttpServer(address, grpc.credentials.createInsecure());
    // 注册路由
    client.bindRoute({
        path: "/QAQ",
        method: httpserver.RouteInfo.Method.GET,
        is_parse_query: true,
        is_parse_body: false,
    }, function(err, response) {
        // 收到路由请求
        console.log(err, response);
        // 返回路由处理结果
        client.returnResponse({
            require_id: response.require_id,
            statusCode: 200,
            response_head: {},
            response_data: JSON.stringify(response.query)
        }, function(err, response) {
            // 收到性能统计
            console.log(err, response);
        })

    });

    // 模拟发起请求
    setTimeout(function() {
        var req = http.get("http://localhost:1337/QAQ?a=a", (res, socket, head) => {
            var datas = []
            res.on("data", (chunk) => {
                datas.push(chunk);
            });
            res.on("end", () => {
                // 打印结果
                console.log("request res:", Buffer.concat(datas).toString());
            });
        });
    }, 200)
}

main();

总结

以上代码流程只是简单的示例,有一点不好的就是路由的注册变成了一次性,这不能说完全不对。但更优的做法应该改为流数据,这就不需要调用callback来结束服务,而是可以通过流不停地发起请求,流的实践参考官方代码:route_guide