在上一篇文章中,我们实现了一个可以广播消息给多个接收者的日志系统。在本篇中,将介绍如何对消息进行过滤,从而只处理我们感兴趣的消息。如只把一些严重的错误信息写入磁盘,但对所有类型的消息都打印到屏幕。
Binding
前面例子中我们使用了如下绑定:
1 2 3 4 5 6 7 |
err = ch.QueueBind( q.Name, //queue name "", //routing key "logs", //exchange false, nil) |
绑定是交换器(exchange)和队列之间的关系,我们可以简单的理解为:队列对其绑定的交换器的消息感兴趣。
绑定函数Bind()可以指定routing_key参数,为了避免跟之前的Channel.Publish的参数混淆,我们称之为binding_key。下面就是使用了绑定参数的例子:
1 2 3 4 5 6 7 |
err = ch.QueueBind( q.Name, //queue name "black", //routing key "logs", //exchange false, nil) |
绑定参数的作用取决于exchange的类型,前面例子中使用的fanout类型的exchange是会忽略掉这个值的。
Direct exchange
前面文章中的日志系统会将所有消息广播分发到所有的消费者处理程序。现在我们想要扩展成为根据消息的严重程度来过滤分发,如只将严重的error级别的日志写入磁盘,而不写入info和warn类型的日志消息以节省磁盘空间。
前面使用的fanout exchange,只是对消息一味的广播转发,可扩展性差,无法满足我们的需求。所以,我们使用Direct exchange进行替代。Direct exchange的路由算法很简单:就是将exchange的binding_key和消息的routing_key进行比较,如果完全匹配这说明是需要分发的队列。
如下图配置:
在图中,direct exchange X有两个队列与之绑定。队列Q1的binding_key是orange, 而队列Q2有两个binding_key: black,green.
由此,当发送routing_key为orange的消息时会被路由到Q1,而带有black或green的routing_key的消息则会被分发到Q2,其他类型的消息都会被忽略。
多重绑定
Direct exchange会将消息广播至所有匹配的绑定队列,因此很容易实现对同一个binding_key需要分发到多个队列的情况。如图,带有routing_key的消息会被分发到Q1和Q2两个队列。
发送日志
考虑下如何实现?首先需要使用direct类型的exchange替换掉fanout类型,然后在发送消息是用routing_key来表示日志级别,而接收消息的脚本需要指定接收哪些级别的消息。先来看看如何发送日志:
首先创建exchange:
1 2 3 4 5 6 7 8 9 10 |
err = ch.ExchangeDeclare( "logs_direct", //name "direct", //type true, //durable false, //auto-deleted false, //internal false, //no-wait nil, //arguments ) |
然后准备发送消息:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
err = ch.ExchangeDeclare( "logs_direct", //name "direct", //type true, //durable false, //auto-deleted false, //internal false, //no-wait nil, //arguments ) failOnError(err, "Failed declare an exchange") body := bodyFrom(os.Args) err = ch.Publish( "logs_direct", //exchange severityFrom(os.Args), //routing key false, //mandatory false, //immediate amqp.Publishing{ ContentType: "text/plain", Body: []byte(body), } ) |
为了简单,我们假设消息的级别为”info”, “warning”, “error”三者中的一个.
订阅
接收消息的程序跟之前的大体相同,只是需要为每一种级别的日志消息新建一个绑定:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
q, err := ch.QueueDeclare( "", //name false, //durable false, //delete when unused true, //exclusive false, //no-wait nil, //arguemnts ) failOnError(err, "Failed to declare queue.") if len(os.Args) < 2 { log.Printf("Usage: %s [info] [warnint] [error]", os.Args[0]) os.Exit(0) } for _, s := range os.Args[1:] { log.Printf("Binding queue %s to exchange %s with routing key %s", q.Name, "logs_direct", s) err = ch.QueueBind( q.Name, // queue name s, // routing key "logs_direct", //exchange false, nil) failOnError(err, "Failed to bind a queue") } |
整个文件
emit_log_direct.go文件:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 |
package main import ( "fmt" "log" "os" "strings" "github.com/streadway/amqp" ) func failOnError(err error, msg string){ if err != nil { log.Fatalf("%s: %s", msg, err) panic(fmt.Sprintf("%s: %s", msg, err)) } } func main(){ conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") failOnError(err, "Failed to connect to RabbitMQ") defer conn.Close() ch, err := conn.Channel() failOnError(err, "Failed to open an channel") defer ch.Close() err = ch.ExchangeDeclare( "logs_direct", //name "direct", //type true, false, false, false, nil, ) failOnError(err, "Failed to declare an exchange") body := bodyFrom(os.Args) err = ch.Publish( "logs_direct", // exchange severityFrom(os.Args), //routing key false, false, amqp.Publishing{ ContentType: "text/plain", Body: []byte(body), }) failOnError(err, "Failed to publish a message") log.Printf(" [x] sent %s", body) } func bodyFrom(args []string) string{ var s string if(len(args) < 3) || os.Args[2] == "" { s = "hello" }else{ s = strings.Join(args[2:], " ") } return s } func severityFrom(args []string) string { var s string if len(args) < 2 || args[1] == "" { s = "info" }else { s = os.Args[1] } return s } |
GitHub地址:emit_log.direct.go
receive_logs_direct.go文件:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 |
package main import( "fmt" "log" "os" "github.com/streadway/amqp" ) func failOnError(err error, msg string){ if err != nil { log.Fatalf("%s: %s", msg, err) panic(fmt.Sprintf("%s: %s", msg, err)) } } func main(){ conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") failOnError(err, "Failed to connect to RabbitMQ") defer conn.Close() ch, err := conn.Channel() failOnError(err, "Failed to open a channel") defer ch.Close() err = ch.ExchangeDeclare( "logs_direct", "direct", true, false, false, false, nil, ) failOnError(err, "Failed to declare an exchange") q, err := ch.QueueDeclare( "", //name false, false, true, false, nil, ) failOnError(err, "Failed to declare a queue") if len(os.Args) < 2 { log.Printf("Usage: %s [info] [warning] [error]") os.Exit(0) } for _, s := range os.Args[1:] { log.Printf("Binding queue %s to exchange %s with routing key %s", q.Name, "logs_direct", s) err = ch.QueueBind( q.Name, //queue name s, //routing key "logs_direct", //exchange false, nil, ) failOnError(err, "Failed to bind a queue") } msgs, err := ch.Consume( q.Name, //name "", //consumer true, false, false, false, nil, ) failOnError(err, "Failed to register a consumer") forever := make(chan bool) go func(){ for d:= range msgs{ log.Printf(" [x] %s", d.Body) } }() log.Printf(" [*] Waiting for logs. To Exit press Ctrl+c") <-forever } |
GitHub地址:receive_logs.direct.go
运行结果
将”warning”和”error”级别的消息都写入磁盘,只需运行:
go run receive_logs_direct.go warning error>logs_from_rabbit.log
将所有消息都打印到屏幕:
go run receive_logs_direct.go warning error info
而发送消息:
go run emit_log_direct.go error “this is a log message”
在下一篇的文章中,我们将介绍如何实现基于模式的监听。