rabbitMq学习笔记(6)远程过程调用RPC

在第二篇文章中介绍了如何利用工作队列将耗时任务分发到多个消费者工作程序中。

考虑下另一种情况:如果需要在一个远程机器上执行一个函数然后等待它的返回结果应该怎样?这个过程称之为远程过程调用(RPC:Remote Procedure Call).

本篇将介绍如何利用RabbitMQ实现一个包含客户端和可扩展服务端的RPC系统,仍然跟之前的一样,利用模拟计算来替代真实的耗时任务,这里使用计算斐波那契数列函数。

回调队列(Callback queue)

RPC系统的模式是客户端发送请求给服务器,服务器接收处理后回复一条响应消息。RabbitMQ对此提供了很好的支持,通过在请求中指定callback的回调队列地址来实现接收服务器的响应消息:

Message properties

AMQP 0-9-1协议中共定义了14个消息属性,其中大部分是不常用的,常用的有以下几个:

  • persistent: 标记消息是持久化(true)或者临时的(false),该属性在第二篇文章中有介绍;
  • content_type: 用来描述mime-type的编码,如JSON类型:application/json;
  • reply_to: 用于标记回调队列的名称;
  • correlation_id: 用来表示request和response的关联关系;

Correlation Id

RPC server对Client请求的响应同样需要通过消息队列来传递,可以对每一次请求创建一个回调队列,但这种方式效率很低,更好的方式是:对于每一个客户端只创建一个回调队列。

但这样会带来一个问题:回调队列接收到一个response之后,如何确定其对应的request?这就需要 correlataion_id来标识。客户端在request中添加一个唯一的correlation_id,在接收到服务器返回的response时,根据该值来确定与之匹配的request并处理。如果未能找到与之匹配的correlation_id,说明该response并不属于当前client的请求,为了安全起见,将其忽略即可。

我们可能会问:为什么在没有找到与之匹配的correlation_id时是将其忽略而不是失败报错?这是考虑到服务端的竞争条件:假设RPC server在发送response后宕机了,而此时却没能对当前request发出确认消息(ack).如果这种情况出现,该请求还在队列中会被再次派发。因此当前Request会在服务端处理两次,也会给客户端发送两次Response,故而,client要能处理重复的response,而server端对于Request需要实现幂等。

总结

RPC的工作过程如下:

  • 当Client启动时,会创建一个匿名的、独有的回调队列;

  • 对每一个RPC Request,Client都会设置两个参数:用于标识回调队列的reply_to和用于唯一标识的correlation_id;

  • Request被发送到rpc_queue队列。

  • RPC服务器等待rpc_queue的消息,一旦消息到达,处理任务后将响应结果消息发送到reply_to指定的队列;

  • Client等待callback队列的消息,一旦消息到达,查找与correlation_id匹配的request,然后返回给应用程序。

代码:

首先看下斐波那契数列(Fibonacci)函数:

这里假设输入都是正整数,且不指望它对大整数有效,因为这个方式可能是效率最差的了。

rpc_server.go文件:

Github地址

服务端的代码简单明了:

首先建立RabbitMQ的连接、创建通道和定义队列;

其次如果是多服务器进程,可以通过prefetch值得设置实现的负载均衡;

最后通过Channel.Consume监听队列消息,然后通过goroutine来实现对消息的处理和发送response。

rpc_client.go文件:

Github地址

运行
首先运行RPC server:

客户端计算斐波那契数列:

目前为止设计的RPC系统,不仅仅是能提供RPC服务,还具备其他优点:

当然,这里的Demo过于简单,并没有考虑实际应用中复杂而重要的诸多问题,如:

发表评论

电子邮件地址不会被公开。 必填项已用*标注