到目前为止,我一直专注于如何让消息进出消息代理,也就是RabbitMQ。
实际上,我们可以继续使用RabbitMQ和它的 Exchanges 来连接这个应用程序的其他部分,但是我想探索一个稍微不同的模型:我想使用协调器来跟踪哪些类型的消费者得到消息通知。
这样的话,我断开了传感器数据生成器和数据使用者之间的连接。
同时为了处理这些数据通信,我决定使用事件(event)来通知用户系统中正在发生的事情,并让他们决定是否要处理数据。
其原理大致如下:
-
在协调器内部,我们有构建好的QueueListener。
-
我还需要构建另外一个类型,我叫它EventAggregator。
-
来自RabbitMQ的消息,它将通过一个异步的goroutine进入QueueListener
-
goroutine将把消息传输到一个事件对象(event object)中,并通过事件聚合对象(event aggregation object)进行广播。
-
该对象将维护任何对事件感兴趣的使用者的注册表,并向其发送事件对象的副本。
-
这使我们能够通过将数据转储到下游的Queue 来为这些事件注册其他应用程序,但它也可以让使用者能够在协调器内部进行设置,例如日志系统。
-
最后,如果使用者最终要通过Queue将数据发送到另一个应用程序,则可以对其进行预处理,以添加有用的附加数据,而最终使用者不必知道这些附加信息是如何到达那里的。
编写代码
创建EventAggregator
在coordinator目录下添加eventaggregator.go,代码如下:
-
第28行,建立EventData struct,目前它的字段碰巧和SensorMessage是一样的,但是两个struct的职责不同,所以我们不复用SensorMessage,而是单独建立EventData,以便它们以后可以独立的进化;
-
第5行,建立了EventAggregator struct,也就是事件聚合,它只有一个listeners字段,是一个map,它的key是事件的名称,它的值是回调函数的集合。当事件发生的时候,EventAggregator就轮流调用为该事件注册的回调函数;
-
第9行,就是EventAggregator的构造函数;
-
第16行,AddListener方法,使用者通过该方法可以向EventAggregator注册回调函数;
-
第20行,PublishEvent方法用来发布事件。它接收事件名称和事件的数据作为参数。这里需要判断EventAggregator里是否已经注册了该事件,如果注册了,那么遍历其对应的回调函数,并使用事件数据进行调用。
-
调用回调函数时,使用的不是EventData的指针,而是EventData的副本,这可以保证使用者不会把事件数据搞乱,影响其它使用者
取消订阅的功能我就不做了。
把EventAggregator连接到QueueListener
打开queuelistener.go,添加代码:
-
第19行,在QueueListener struct里面添加字段ea,类型是*EventAggregator;
-
第25行,在QueueListener的构造函数里为ea自读赋初始值。
在AddListener方法里,原来只是把原始数据打印到控制台。现在添加如下代码:
-
创建一个EventData,其字段内容目前和传感器的消息内容一样;
-
使用 QueueListener上的EventAggregator发布事件:
-
事件的名称是MessageReceived_传感器名称
-
第二个参数就是事件数据
发现早已运行的传感器
最后我们要做的就是如何让协调器发现在协调器上线前就已经在运行的传感器。
目前我们的做法是这样的:首先协调器先运行,然后传感器在上线的时候立即把它们的数据Queue发送过去,使用的是Fanout Exchange,这样多个协调器都可以被通知到。
但是,如果传感器先运行,协调器后运行,那么协调器就无法知道传感器的存在,为了解决这个问题,我这样做:
-
我在消息代理中也就是RabbitMQ里,建立一个新的Exchange,它是一个Fanout Exchange,它和其它信息流的方向正好相反。
-
在这里,协调器将会向这个Fanout Exchange发出一个“发现”请求,这个信息将会发送给所有的传感器。
-
传感器接收到这个“发现”请求信息后,将会响应,将它们的数据Queue的名称发送给我们以前建立的那个Fanout Exchange(中间黄色的)。
-
这里会出现一些冗余的信息,但协调器里有过滤机制,所以就这样吧。
我们首先测试一下先运行传感器项目,再运行协调器项目的效果:
可以看到,协调器运行起来以后,没有接收到该传感器的数据。
修改queuetools
我们要解决的就是这个问题,下面看代码,首先看queuetools.go:
这里改动不多,就是把要新建立的Fanout Exchange的名称作为常量存在这里。
注意之前在这里定义的SensorListQueue已经不需要了,可以删掉。
修改queuelistener
然后看queuelistener.go,在这里为QueueListener添加一个DiscoverSensors方法:
该方法中首先我使用了ExchangeDeclare方法来声明这个新的Exchange,并进行设置。
虽然项目中还没用过这个方法,但是里面大多数参数的作用你应该能够猜得出来:
-
name:Exchange的名称
-
kind:Exchange的类型,可以是direct、topic、header或者fanout,这里使用fanout
-
durable:表示这个Exchange是否可持久
-
autoDelete:表示在没有绑定的情况下是否删除Exchange
-
internal:这个参数我们还没见过,如果想拒绝外部的发布请求,就把这个设为true。这可以在高级场景中使用,在高级场景中,Exchange绑定在一起,在消息代理中形成更复杂的拓扑。
-
noWait和args就不介绍了。
现在,协调器可以向这个Exchange发布消息了。而我们只需要向它发送一个消息即可,并没有什么具体的内容要发送,所以我发布了一个空的Publishing,这就可以告诉浏览器我在寻找它们了。
修改传感器
下面我们让传感器(sensor.go)对上面发布的“发现”请求进行响应,不过首先,需要重构一下。
把main函数里面当传感器上面时,发布数据Queue名称那部分代码提取出来放在单独的一个函数里面:
然后在main函数相应的位置进行调用:
-
第39行,对重构的函数进行调用。
-
第41行,创建一个Queue
-
第42行,使用QueueBind方法将这个Queue和SensorDiscovery Exchange
-
第48行,创建goroutine运行一个将要新建的函数listenForDiscoveryRequests。通过使用goroutine,无论当请求什么时候进来,这部分逻辑都将可用,而且不会阻塞系统的其余部分。这里需要传入Queue的名称和Channel。
然后看一下listenForDiscoveryRequests函数:
这里使用Channel的Consume方法对Channel进行设置以便能接收“发现”请求。
然后用for range来接收“发现”请求。这里忽略消息本身即可,因为该消息就是一个触发而已。当消息进来时,调用刚刚重构出来的publishQueueName函数即可。
在queuelistener里调用发现方法
在queuelistener.go的ListenForNewSource方法里,在如下位置调用DiscoverSensors方法:
为什么在这里调用?因为这是可以保证协调器正在监听传感器路由的消息的第一个地方。
运行测试
先运行一个传感器,然后在运行协调器:
传感器这里我使用了freq参数,让其每两秒钟生成一个数据。
可以看到,在这种情况下协调器也可以发现已经运行的传感器并接收数据了。
你可以运行多个传感器和多个协调器,应该也会好用的。
这也是一种非常简单的分布式应用吧。