类似Google这种大公司产出的产品,一般就两种情况,一种是面向小白用户的,如此让G粉簇拥而来找Bug优化产品思路,等到时机成熟再推出正式版或者取消产品,AngularJS就是这一类。还有一种就是Google自己的需求而总结出来的产品,优点什么的我就不吹了,gRPC就是这一类的。
序言
今年年初的时候我就一直想做一款基于RPC实现组件化搭建网站的一款产品。目的就是为了让各种语言的程序员能以最低沟通、学习代价来进行快速、稳定开发产品。而当初做的时候为了速度摸坑,用了Nodejs来进行开发,做出了GQ这款产品,自己边用边总结,说真的坑是真的多,JSON是不够用的,还有bytes、流数据等等问题。几个月下来,发现这个东西是个史诗大坑,因为要考虑到各种语言的兼容、使用难易、不同类型组件(数据库组件、流文件处理组件等)通用接口等等问题,迟迟没有拿出一套跨语言的规范来。几乎要放弃。这项目被我搁置。
不巧,gRPC出来了。当初心想有搞头,但是当时文档不够健全,让那些爱折腾的人先去探探路吧。
现在我作为第二批吃螃蟹的人,上手一试,心中暗叹:厉害了我的Goggle
案例简介
一套基于路由注册的分发服务。这里使用Nodejs来快速上手。
服务端:注册HTTP端口,以及gRPC基础服务,通过基础服务,可以注册HTTP请求的处理权。
子服务:注册基础服务,实现对HTTP请求的处理。
流程如下:
HTTP请求--->服务端--/a/b-->子服务1
|--/a/c-->子服务2
+--/a/d-->子服务3
实现流程
PS:我这里不赘述XX为什么要这以做之类的话语,直接上代码,并简单解释代码中的重点。这个案例没有难点。
gRPC的安装省略。
使用Nodejs版本v6.9.1。用到一些ES6的语法,不完全兼容v4.*
proto文件
syntax = "proto3";
option objc_class_prefix = "Gaubee";
package httpserver;
// HttpServer服务的定义
service HttpServer {
rpc BindRoute(RouteInfo) returns(RequestInfo) {}
rpc ReturnResponse(ResponInfo) returns(ResponReply) {}
}
message RouteInfo {
enum Method {
GET = 0;
POST = 1;
PUT = 2;
DELETE = 3;
}
// 注册的路由pathname
string path = 1;
Method method = 2;
// 是否直接解析query到query中
bool is_parse_query = 3;
// 是否解析带有Body的请求
bool is_parse_body = 4;
}
message RequestInfo {
// 请求的ID,在做响应的时候需要带上
string require_id = 1;
// 请求链接中的数据
map<string, string> query = 2;
// 请求数据包中的数据
map<string, string> body = 3;
}
message ResponInfo{
string require_id = 1;
map<string, string> response_head = 2;
string response_data = 3;
int32 statusCode = 4;
}
// 返回整个请求的时间信息
message ResponReply{
// 收到请求,解析请求query、body花费的时间
float route_parsed_time = 1;
// 传输请求数据花费的时间
float route_send_time = 2;
// 处理请求至服务端收到数据花费的时间
float responsed_time = 3;
// 返回请求结果给客户花费的时间
float responsed_send_time = 4;
}
以上的proto文件描述了两个接口:
- BindRoute:实现路由绑定把要绑定的路由写道path中,这里为了简单,我只是进行了字符串相等匹配。BindRoute的回调函数不会马上触发。要等到服务端有HTTP请求的时候,匹配到对应的路由,最后再执行返回。
- ReturnResponse:BindRoute收到回调后,意味着子服务开始处理HTTP请求,在处理完成后,需要调用此函数来返回处理结果。
require_id是注册基础服务后生成的一次性编号,每一次注册都会生成一个,在收到HTTP请求是,匹配到对应的路由,就要通过这个require_id来发送请求、接受处理结果。
服务端
"use strict";
// PROTO文件路径,按需换成自己的文件路径
const PROTO_PATH = __dirname + '/../pingpong/http-server.proto';
const grpc = require('grpc');
const http = require("http");
const url = require("url");
const querystring = require("querystring");
// 动态加载PROTO文件,生成接口
const httpserver = grpc.load(PROTO_PATH).httpserver;
/** 路由映射表
* 结构为:{ [METHOD:GET|POST|PUT|DELETE] : { [PATHNAME] : Array<String>[require_id,...] } }
*/
const route_res_list = {};
/** require_id对应的路由注册信息与回调函数
* 结构为:{ args : 路由注册信息, callback: 回调函数,用于通知子服务来处理HTTP请求 }
*/
const handleCache = new Map();
/** require_id对应的HTTP请求
* 结构为:{ req : HTTP.Request, res: HTTP.Response }
*/
const reqresCache = new Map();
// 初始化路由映射表的结构
for (let method in httpserver.RouteInfo.Method) {
route_res_list[method] = {};
}
/** rgRPC服务:绑定路由
*
*/
function BindRoute(call, callback) {
const args = call.request;
const routes = route_res_list[args.method];
var handles = routes[args.path];
if (!handles) {
handles = routes[args.path] = new Set();
}
// 生成require_id
const require_id = Math.random().toString(36).substr(2);
// 加入路由映射表
handles.add(require_id);
// 这里不调用callback,而是等待到收到HTTP请求的时候再执行
handleCache.set(require_id, {
args: args,
callback
});
}
/** rgRPC服务:响应HTTP请求
*
*/
function ReturnResponse(call, callback) {
const args = call.request;
const require_id = args.require_id;
// 取出缓存中的req、res对象
const {
req,
res
} = reqresCache.get(require_id);
if (!res) {
return callback(`Ref Error:require_id:${require_id} no ref.`);
}
// 返回数据
res.writeHead(args.statusCode, args.response_head);
res.end(args.response_data);
// 清除缓存
reqresCache.delete(require_id);
// 返回整个流程的消耗时间,这里为了简化代码没有加统计时间的代码
callback(null, {
route_parsed_time: 0,
route_send_time: 0,
responsed_time: 0,
responsed_send_time: 0,
});
}
function main() {
const grpc_server = new grpc.Server();
grpc_server.addProtoService(httpserver.HttpServer.service, {
bindRoute: BindRoute,
returnResponse: ReturnResponse,
});
// 注册gRPC服务
grpc_server.bind('0.0.0.0:50051', grpc.ServerCredentials.createInsecure());
grpc_server.start();
}
main();
// 注册HTTP服务
const server = http.createServer((req, res) => {
const routes = route_res_list[req.method];
if (routes) {
const url_info = url.parse(req.url);
const handles = routes[url_info.pathname];
if (handles && handles.size) {
const {
value: require_id,
has_key
} = handles.values().next();
if (has_key) {
console.log("no handle");
return
}
const handle = handleCache.get(require_id);
// 通知客户端处理HTTP请求,并带上HTTP请求的一些数据
handle.callback(null, {
require_id,
// TODO:headers
query: handle.args.is_parse_query ? Object.assign({}, querystring.parse(url_info.query)) : {},
// TODO:body应该改用流数据传输
body: (handle.args.method !== "GET" && handle.args.is_parse_body) ? {} : {}
});
// 缓存req、res
reqresCache.set(require_id, {
req,
res
});
// 清除缓存,此require_id已经报废,只用于最后的响应res的处理
handleCache.delete(require_id);
handles.delete(require_id);
}
}
});
server.listen(1337, "0.0.0.0", () => {});
子服务
const PROTO_PATH = __dirname + '/../pingpong/http-server.proto';
const grpc = require('grpc');
const http = require("http");
const httpserver = grpc.load(PROTO_PATH).httpserver;
const address = 'localhost:50051'
function main() {
var client = new httpserver.HttpServer(address, grpc.credentials.createInsecure());
// 注册路由
client.bindRoute({
path: "/QAQ",
method: httpserver.RouteInfo.Method.GET,
is_parse_query: true,
is_parse_body: false,
}, function(err, response) {
// 收到路由请求
console.log(err, response);
// 返回路由处理结果
client.returnResponse({
require_id: response.require_id,
statusCode: 200,
response_head: {},
response_data: JSON.stringify(response.query)
}, function(err, response) {
// 收到性能统计
console.log(err, response);
})
});
// 模拟发起请求
setTimeout(function() {
var req = http.get("http://localhost:1337/QAQ?a=a", (res, socket, head) => {
var datas = []
res.on("data", (chunk) => {
datas.push(chunk);
});
res.on("end", () => {
// 打印结果
console.log("request res:", Buffer.concat(datas).toString());
});
});
}, 200)
}
main();
总结
以上代码流程只是简单的示例,有一点不好的就是路由的注册变成了一次性,这不能说完全不对。但更优的做法应该改为流数据,这就不需要调用callback来结束服务,而是可以通过流不停地发起请求,流的实践参考官方代码:route_guide。