在第二篇文章中介绍了如何利用工作队列将耗时任务分发到多个消费者工作程序中。
考虑下另一种情况:如果需要在一个远程机器上执行一个函数然后等待它的返回结果应该怎样?这个过程称之为远程过程调用(RPC:Remote Procedure Call).
本篇将介绍如何利用RabbitMQ实现一个包含客户端和可扩展服务端的RPC系统,仍然跟之前的一样,利用模拟计算来替代真实的耗时任务,这里使用计算斐波那契数列函数。
回调队列(Callback queue)
RPC系统的模式是客户端发送请求给服务器,服务器接收处理后回复一条响应消息。RabbitMQ对此提供了很好的支持,通过在请求中指定callback的回调队列地址来实现接收服务器的响应消息:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
q, err := ch.QueueDeclare( "", //name false, //durable false, //delete when usused true, //exclusive false, //nowait nil, //argments ) err = ch.Publish( "", //exchange "rpc_queue" //routing key false, //mandatory false, //immediate amqp.Publishing{ ContentType: "text/plain", CorrelationId: corrId, ReplyTo: q.Name, Body: []byte(strconv.Itoa(n)), } ) |
Message properties
AMQP 0-9-1协议中共定义了14个消息属性,其中大部分是不常用的,常用的有以下几个:
- persistent: 标记消息是持久化(true)或者临时的(false),该属性在第二篇文章中有介绍;
- content_type: 用来描述mime-type的编码,如JSON类型:application/json;
- reply_to: 用于标记回调队列的名称;
- correlation_id: 用来表示request和response的关联关系;
Correlation Id
RPC server对Client请求的响应同样需要通过消息队列来传递,可以对每一次请求创建一个回调队列,但这种方式效率很低,更好的方式是:对于每一个客户端只创建一个回调队列。
但这样会带来一个问题:回调队列接收到一个response之后,如何确定其对应的request?这就需要 correlataion_id来标识。客户端在request中添加一个唯一的correlation_id,在接收到服务器返回的response时,根据该值来确定与之匹配的request并处理。如果未能找到与之匹配的correlation_id,说明该response并不属于当前client的请求,为了安全起见,将其忽略即可。
我们可能会问:为什么在没有找到与之匹配的correlation_id时是将其忽略而不是失败报错?这是考虑到服务端的竞争条件:假设RPC server在发送response后宕机了,而此时却没能对当前request发出确认消息(ack).如果这种情况出现,该请求还在队列中会被再次派发。因此当前Request会在服务端处理两次,也会给客户端发送两次Response,故而,client要能处理重复的response,而server端对于Request需要实现幂等。
总结
RPC的工作过程如下:
- 当Client启动时,会创建一个匿名的、独有的回调队列;
-
对每一个RPC Request,Client都会设置两个参数:用于标识回调队列的reply_to和用于唯一标识的correlation_id;
-
Request被发送到rpc_queue队列。
-
RPC服务器等待rpc_queue的消息,一旦消息到达,处理任务后将响应结果消息发送到reply_to指定的队列;
-
Client等待callback队列的消息,一旦消息到达,查找与correlation_id匹配的request,然后返回给应用程序。
代码:
首先看下斐波那契数列(Fibonacci)函数:
1 2 3 4 5 6 7 8 9 10 |
func fib(n int) int { if n== 0 { return 0 }else if n==1 { return 1 }else { return fib(n-1) + fib(n-2) } } |
这里假设输入都是正整数,且不指望它对大整数有效,因为这个方式可能是效率最差的了。
rpc_server.go文件:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 |
package main import ( "fmt" "log" "strconv" "github.com/streadway/amqp" ) func failOnError(err error, msg string) { if err != nil { log.Fatalf("%s: %s", msg, err) panic(fmt.Sprintf("%s: %s", msg, err)) } } func fib(n int) int { if n== 0 { return 0 }else if n==1 { return 1 }else { return fib(n-1) + fib(n-2) } } func main(){ conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") failOnError(err, "Failed to connect to RabbitMQ") defer conn.Close() ch, err := conn.Channel() failOnError(err, "Failed to open a channel") defer ch.Close() q, err := ch.QueueDeclare( "rpc_queue", //name false, //durables false, //delete when unused false, //exclusive false, //no wait nil, //args ) failOnError(err, "Failed to declare a queue") err = ch.Qos( 1, // prefetch count 0, // prefetch size false, //global ) failOnError(err, "Failed to set Qos") msgs, err := ch.Consume( q.Name, //queue "", //exchange false, // auto-ack false, //exclusive false, //no-local false, //no-wait nil, //args ) failOnError(err, "Failed to register a consumer") forever := make(chan bool) go func() { for d := range msgs { n, err := strconv.Atoi(string(d.Body)) failOnError(err, "Failed to convert body to an integer") log.Printf(" [.] fib(%d)", n) response := fib(n) err = ch.Publish( "", //exchange d.ReplyTo, //routing key false, //mandatory false, //immediate amqp.Publishing{ ContentType : "text/plain", CorrelationId: d.CorrelationId, Body: []byte(strconv.Itoa(response)), }) failOnError(err, "Failed to publish a message") d.Ack(false) } }() log.Printf(" [*] Awaiting RPC reqeusts") <-forever } |
服务端的代码简单明了:
首先建立RabbitMQ的连接、创建通道和定义队列;
其次如果是多服务器进程,可以通过prefetch值得设置实现的负载均衡;
最后通过Channel.Consume监听队列消息,然后通过goroutine来实现对消息的处理和发送response。
rpc_client.go文件:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 |
package main import ( "fmt" "log" "strconv" "os" "math/rand" "strings" "time" "github.com/streadway/amqp" ) func failOnError(err error, msg string) { if err != nil { log.Fatalf("%s: %s", msg, err) panic(fmt.Sprintf("%s: %s", msg, err)) } } func randomString(l int) string { bytes := make([]byte, l) for i:=0; i<l; i++ { bytes[i] = byte(randInt(65, 90)) } return string(bytes) } func randInt(min int, max int) int { return min + rand.Intn(max - min) } func bodyFrom(args []string) int { var s string if(len(args) < 2 || os.Args[1]==""){ s = "30" }else{ s = strings.Join(args[1:], " ") } n, err := strconv.Atoi(s) failOnError(err, "Failed to convert arg to integer") return n } func fibonacciRPC(n int) (res int, err error) { conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") failOnError(err, "Failed to connect to RabbitMQ") defer conn.Close() ch, err := conn.Channel() failOnError(err, "Failed to open a channel") defer ch.Close() q, err := ch.QueueDeclare( "", //name false, //durables false, //delete when unused true, //exclusive false, //no wait nil, //args ) failOnError(err, "Failed to declare a queue") msgs , err := ch.Consume( q.Name, //queue "", //consumer true, //auto-ack false, //exclusive false, //no-lock false, //nowait nil, ) failOnError(err, "Faield to register a consumer") corrId := randomString(32) err = ch.Publish( "", //exchange "rpc_queue", //routing key false, //mandatory false, //immediate amqp.Publishing{ ContentType: "text/plain", CorrelationId: corrId, ReplyTo: q.Name, Body: []byte(strconv.Itoa(n)), }) failOnError(err, "Failed to publish a message") for d:= range msgs { if corrId == d.CorrelationId { res, err = strconv.Atoi(string(d.Body)) failOnError(err, "Failed to convert body to integer") break } } return } func main(){ rand.Seed(time.Now().UTC().UnixNano()) n:= bodyFrom(os.Args) log.Printf(" [x] Requesting fib(%d)", n) res, err := fibonacciRPC(n) failOnError(err, "Failed to handle RPC request") log.Printf(" [.] Got %d", res) } |
运行
首先运行RPC server:
1 2 3 |
go run rpc_server.go # => [x] Awaiting RPC requests |
客户端计算斐波那契数列:
1 2 3 |
go run rpc_client.go 30 # => [x] Requesting fib(30) |
目前为止设计的RPC系统,不仅仅是能提供RPC服务,还具备其他优点:
1 2 3 4 |
* 如果单台RPC服务器性能缓慢,可以很容易的进行扩展,只需在新窗口运行一个rpc_server.go脚本即可; * 在客户端,RPC模式要求对消息进行一次发送和接收操作,因此只需要一次网络往返即可完成一次RPC请求; |
当然,这里的Demo过于简单,并没有考虑实际应用中复杂而重要的诸多问题,如:
1 2 3 4 5 |
* 客户端如何处理服务端掉线的情况? * 客户端如何处理服务端超时情况? * 如果服务端故障导致异常,是否需要将异常转发给客户端处理? * 在对消息处理前未对消息的合法性进行检查,如边界值、类型信息等; |