AI智能
改变未来

RabbitMQ 入门 (Go) – 3. 模拟传感器,生成数据并发布

现在,我们需要模拟传感器,生成数据,并发布到RabbitMQ。

建立传感器项目

在GOPATH src下建立文件夹sensors,使用go mod init初始化,并创建main.go。

同时别忘了安装amqp的包:go get -u github.com/streadway/amqp

我们要生成一些模拟数据,生成数据有一定的范围(位于一个最大值和最小值之间),如下图:

因此,我们需要这样几个配置参数:

  1. 传感器的名称

  2. 传感器数据的更新频率

  3. 模拟生成数据的最大值

  4. 模拟生成数据的最小值

  5. 与前一次生成数据的差值的最大值(变化幅度的最大值)

设置命令行参数并读取

在这个项目里,我们需要通过命令行参数来传递配置,并在Go程序里面进行解析和读取。我们可以使用os.Args来搞这些命令行参数,但是更好的办法是使用flag这个包(其内部实现使用的也是os.Args)。

我们先看代码:

  1. 第5-9行,我们声明了5个命令行参数。都是使用flag包下相应的函数实现的。

  2. 这几个命令行参数分别表示传感器名称、模拟数据的更新频率、模拟数据的最大值、最小值以及变化幅度的最大值。

  3. 这些命令行参数的类型分别是string,uint,float64,float64,float64。

  4. 这些函数的参数都类似:

  5. 第一个参数是命令行参数的名称

  6. 第二个参数是命令行参数的默认值

  7. 第三个参数是参数的描述/帮助

  • 在main函数里,我们调用 flat.Parse()函数,就可以将命令行的参数值解析到5-9行声明变量里面。

  • 我们测试一下,命令行输入go run . –help,其结果如下:

    生成模拟数据

    要生成模拟传感器的数据,需要使用到math/rand和time这两个包。

    先看代码:

    1. 第17行,我们需要一个*rand.Rand类型来生成随机数,它又需要一个源,这里使用time.Now().UnixNano()生成源,这样做的好处是因为这个时间纳秒数永远不会重复。

    2. 第19行,声明value,它表示传感器的数值,在这先生成一个初始值。

    3. 第20行,是额定值,在这里也就是最大值最小值的中间平均值。

    4. 第25行,把更新频率(每秒更新的次数)转化为了两次更新之间的时间间隔(毫秒),并解析成time.Duration这个类型。

    5. 第26行,time.Tick函数会返回一个time的Channel,该函数会按照提供的时间间隔不断触发,并向这个Channel发送当前时间。

    6. 第28行,使用for range来处理sigad8nal这个Channel,每次Channel中有数据传递过来,我们就使用calcValue这个函数来生成新的模拟数据。

    7. 第29行,把生成的最新数据打印一下即可。

    calcValue函数

    生成模拟数据的逻辑是如果数据偏离额定值,那么尽量让下次生成的值向额定值靠拢。

    这部分可根据自己的特定需求来实现,不必和我的相同。

    先看代码:

    1. 第35行,声明了maxStep和minStep两个变量,表示本次更新相比上次所能够发生的最大变化和最小变化幅度。

    2. 第36 – 42 行,区分当前值大于额定值或小于额定值两种情况,按不同的逻辑得出maxStep和minStep

    3. 第44行,使用maxStep 和minStep以及随机数生成新的value数据。

    运行sensors项目

    使用go run .运行,命令行参数使用默认值即可:

    一切正常的话,它就会每秒钟生成5次数据。

    如何运行多个传感器

    生产环境中,通常会接收来自多个传感器的数据。

    这里,我们让每个传感器都设置自己的路由Key,所以RabbitMQ将会为每个Key创建一个Queue:

    但是这也会引起问题,就是之前章节里面的那个协调程序如何发现这些传感器呢?

    首先,我们可以让每个传感器使用路由Key向一个所有传感器和协调程序都知晓的路径中发送一个消息。但这只能解决问题的一半,另一半我们以后再说。

    将传感器数据发布到RabbitMQ

    创建传感器的消息类型

    这里会使用到encoding/gob包。

    看代码:

    • 在sensors包中创建model包,并建立models.go文件。

    • 2088

      在models.go的第12行,建立SensorMessage作为传感器传递消息的类型,里面包含三个字段分别是传感器名称、数值和时间戳。

    • 很显然我们不能把Go的struct类型直接扔到RabbitMQ里面,但我们项目中的各种客户端只涉及到Go语言,所以在这里我使用Go语言的gob来对消息进行编码,这样会更高效一些。如果这个项目是跨语言的我可能会使用JSON或Protocol Buffers。

    • 在model包的init函数里面,需要使用gob包的Register函数把将要编码的类型进行注册,这样依赖于这个包的其它Go程序就可以把 SensorMessage这个类型的消息对象发送过去了

    建立Queue相关的工具包

    建立tools包,并建立queuetools.go文件,其内容如下:

    代码内容与之前的项目类似,就不解释了。

    发布传感器数据到RabbitMQ

    这里还会使用到bytes包。

    回到main.go,修改代码:

    1. 前面添加了获取Channel和Queue的代码。其中第37行比较重要,因为我们不能保证在程序运行时,使用Queue名称作为路由Key的Queue存在,而使用GetQueue函数,就可以保证这个Queue会被正确的设置,并准备好被我们使用了。

    2. 第42行,使用bytes包创建了一个*bytes.Buffer,它用来来承载编码后的数据,这个Buffer可以重复利用,所以实在forrange的外部声明的。

    3. 但是每次使用Buffer都需要进行重置,也就是第53行的作用,这样以前的数据就会被移除,Buffer的指针会回到初始位置。

  • 第43行,使用gob和Buffer来创建编码器。

  • 第54行,使用编码器的Encode方法对消息进行编码。

  • 第56行,创建要发送给RabbitMQ的消息(amqp.Publishing类型),这里只需要填写Body字段即可,其它的字段根据自己的需求选填即可。

  • 第60行,使用Channel来发布消息,这里使用的是默认的Exchange,路由key就是Queue的名字,最后一个参数就是发布的消息。

  • 运行程序

    运行sensors包:

    打开控制台:

    可以看到发送频率确实是每秒5次。

    打开sensor Queue:

    目前已经有384条消息了,都没有被发送。

    随便点开一个消息查看其内容:

    可以看到Body应该是Base64编码的。因为gob编码器使用的是二进制消息格式,尽可能的高效,所以在控制台里面它没有一个有意义的表述展示。

    然后,先停止运行程序。

    传感器上线时通知协调程序

    最后我们就来处理上面那个问题:当传感器上线的时候,得让协调程序知道,并发送数据。

    因为每个传感器都创建了一个自己的Queue,所以在没有帮助的情况下,协调程序将无法有效知道这些传感器。

    这个问题实际上具体需要做两件事,我们先来做第一件事:

    多个传感器他们Queue的名称是不一样的,是动态的,所以我们需要一个大家都知道的Queue,它用来将每个新创建的传感器的Queue名称发送给协调程序。

    首先,在&nbs15b0p;queuetools.go里面添加这个Queue的名称,使用一个常量保存:

    然后,在main.go里,使用这个名称创建一个Queue,并将传感器的Queue的名称发布上去:

    再次运行sensor包

    打开控制台:

    可以看到SensorList Queue出现了。

    进入到SensorList Queue,看它的Message:

    可以看到当前这一个传感器的名字sensor就在里面。

    赞(0) 打赏
    未经允许不得转载:爱站程序员基地 » RabbitMQ 入门 (Go) – 3. 模拟传感器,生成数据并发布