rabbitmq是当下非常流行的消息队列,本文主要介绍springboot中如何配置使用rabbitmq。
文中代码基于springboot2.1.6,源代码见文末地址。
1.为了自己玩方便,可以用docker安装rabbitmq,见专栏内文章
《docker安装rabbitmq》
2.相关配置
spring.rabbitmq.host=192.168.59.128spring.rabbitmq.port=5672spring.rabbitmq.username=adminspring.rabbitmq.password=admin#这个如果不配置,就会默认找\"/\"spring.rabbitmq.virtual-host=my_vhost#指定心跳超时,单位秒,0为不指定;默认60sspring.rabbitmq.requested-heartbeat=20#是否启用【发布确认】spring.rabbitmq.publisher-confirms=true#是否启用【发布返回】spring.rabbitmq.publisher-returns=true#连接超时,单位毫秒,0表示无穷大,不超时spring.rabbitmq.connection-timeout=10
3.pom依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId></dependency>
4.rabbitmq有4种exchange
Exchange type | Default pre-declared names |
---|---|
Direct exchange | (Empty string) and amq.direct |
Fanout exchange | amq.fanout |
Topic exchange | amq.topic |
Headers exchange | amq.match (and amq.headers in RabbitMQ) |
a.direct exchange使用routing key进行消息传输,如下图,routing key其实就是queue和exchange的绑定。适用于多工作者协同工作的场景。
绑定代码如下:代码中queue名称和routing key名称都是\”direct\”
@Configurationpublic class DirectRabbitConfig { @Bean public Queue direct() { return new Queue(\"direct\"); } @Bean public DirectExchange directExchange() { return new DirectExchange(\"directExchange\"); } @Bean public Binding directBindingExchange(Queue direct, DirectExchange directExchange) { return BindingBuilder.bind(direct).to(directExchange).with(\"direct\"); }}
sender如下:
@Servicepublic class DirectSenderService { private Logger logger = LoggerFactory.getLogger(getClass()); @Resource private AmqpTemplate rabbitTemplate; public void sendString(String message) { logger.info(\"direct sender : \" + message); rabbitTemplate.convertAndSend(\"directExchange\", \"direct\", message); } public void sendObject(Object message) { String messageStr = JSONObject.toJSONString(message); logger.info(messageStr); rabbitTemplate.convertAndSend(\"directExchange\", \"direct\", messageStr); }}
receiver:
@RabbitHandler@RabbitListener(queues = {\"direct\"})public void processDirect(Message message) { logger.info(\"Receiver direct: {}\", new String(message.getBody()));}
b.fanout exchange就是广播模式,把消息路有给所有的绑定队列,可以适用于群聊天的场景。
配置代码如下:其中有3个队列绑定一个fanout exchange
@Configurationpublic class FanoutRabbitConfig { @Bean public Queue queueA(){ return new Queue(\"fanout.a\"); } @Bean public Queue queueB(){ return new Queue(\"fanout.b\"); } @Bean public Queue queueC(){ return new Queue(\"fanout.c\"); } @Bean public FanoutExchange fanoutExchange() { return new FanoutExchange(\"fanoutExchange\"); } @Bean public Binding bindingExchangeA(Queue queueA, FanoutExchange fanoutExchange) { return BindingBuilder.bind(queueA).to(fanoutExchange); } @Bean public Binding bindingExchangeB(Queue queueB, FanoutExchange fanoutExchange) { return BindingBuilder.bind(queueB).to(fanoutExchange); } @Bean public Binding bindingExchangeC(Queue queueC, FanoutExchange fanoutExchange) { return BindingBuilder.bind(queueC).to(fanoutExchange); }}
sender:
@Servicepublic class FanoutSenderService { private Logger logger = LoggerFactory.getLogger(getClass()); @Autowired private AmqpTemplate rabbitTemplate; public void send(String message) { logger.info(\"fanout sender : {}\", message); rabbitTemplate.convertAndSend(\"fanoutExchange\",\"\", message); }}
receiver:
@RabbitHandler @RabbitListener(queues = {\"fanout.a\", \"fanout.b\", \"fanout.c\"}) public void processFanout1(Message message) { logger.info(\"Receiver fanout: {}\", new String(message.getBody())); }
c.topic exchange通过routing key和通配符来路由消息,适用于发布订阅场景。
配置代码:
@Configurationpublic class TopicRabbitConfig { @Bean public Queue queueMessage() { return new Queue(\"topic.message\"); } @Bean public Queue queueMessage2() { return new Queue(\"topic.message2\"); } /** * 将队列绑定到Topic交换器 * @return */ @Bean public TopicExchange exchange() { return new TopicExchange(\"topicExchange\"); } /** * 将队列绑定到Topic交换器 * @param queueMessage * @param exchange * @return */ @Bean public Binding bindingExchangeMessage(Queue queueMessage, TopicExchange exchange) { return BindingBuilder.bind(queueMessage).to(exchange).with(\"topic.message\"); } /** * 将队列绑定到Topic交换器 采用#的方式 * @param exchange * @param queueMessage2 * @return */ @Bean Binding bindingExchangeMessage2(TopicExchange exchange, Queue queueMessage2) { return BindingBuilder.bind(queueMessage2).to(exchange).with(\"topic.#\"); }}
sender:
@Servicepublic class TopicSenderService { private Logger logger = LoggerFactory.getLogger(getClass()); @Resource private AmqpTemplate rabbitTemplate; public void send1(String message) { logger.info(\"topic sender1 : \" + message); rabbitTemplate.convertAndSend(\"topicExchange\", \"topic.message\", message); } public void send2(String message) { logger.info(\"topic sender2 : \" + message); rabbitTemplate.convertAndSend(\"topicExchange\", \"topic.message2\", message); }}
接受
@RabbitHandler @RabbitListener(queues = {\"topic.message\"}) public void processTopic(Message message) { logger.info(\"Receiver topic: {}\", new String(message.getBody())); } @RabbitHandler @RabbitListener(queues = {\"topic.message2\"}) public void processTopic2(Message message) { logger.info(\"Receiver topic2: {}\", new String(message.getBody()));}
d.header exchange忽略routing key参数,用header来取代
配置
@Configurationpublic class HeadersRabbitConfig { @Bean public Queue headerQueue() { return new Queue(\"headerQueue\"); } @Bean public Queue headerQueue2() { return new Queue(\"headerQueue2\"); } @Bean public HeadersExchange headerExchange() { return new HeadersExchange(\"headerExchange\"); } @Bean public HeadersExchange headerExchange2() { return new HeadersExchange(\"headerExchange2\"); } @Bean public Binding bindingExchange(Queue headerQueue, HeadersExchange headerExchange) { Map<String,Object> headerValues = new HashMap<>(3); headerValues.put(\"param1\", \"value1\"); headerValues.put(\"param2\", \"value2\"); return BindingBuilder.bind(headerQueue).to(headerExchange).whereAll(headerValues).match(); } @Bean public Binding bindingExchange2(Queue headerQueue2, HeadersExchange headerExchange2) { Map<String,Object> header = new HashMap<>(3); header.put(\"param1\", \"value1\"); header.put(\"param2\", \"value2\"); return BindingBuilder.bind(headerQueue2).to(headerExchange2).whereAny(header).match(); }}
发送:
@Servicepublic class HeadersSenderService { private Logger logger = LoggerFactory.getLogger(getClass()); @Resource private AmqpTemplate rabbitTemplate; public void headerSend(Map<String, Object> head, String msg){ logger.info(\"header send message: \"+msg); rabbitTemplate.convertAndSend(\"headerExchange\", \"headerQueue\", getMessage(head, msg)); } public void headerSend2(Map<String, Object> head, String msg){ logger.info(\"header1 send message: \"+msg); rabbitTemplate.convertAndSend(\"headerExchange2\", \"headerQueue2\", getMessage(head, msg)); } private Message getMessage(Map<String, Object> head, Object msg){ MessageProperties messageProperties = new MessageProperties(); for (Map.Entry<String, Object> entry : head.entrySet()) { messageProperties.setHeader(entry.getKey(), entry.getValue()); } MessageConverter messageConverter = new SimpleMessageConverter(); return messageConverter.toMessage(msg, messageProperties); }}
接收:
@RabbitHandler @RabbitListener(queues = {\"headerQueue\"}) public void processHeaders(Message message) { logger.info(\"Receiver header: {}\", new String(message.getBody())); } @RabbitHandler @RabbitListener(queues = {\"headerQueue2\"}) public void processHeaders1(Message message) { logger.info(\"Receiver header2: {}\", new String(message.getBody())); }
5.测试,测试代码写在RabbitMqController中,启动Application即可进行url测试。见源码。
说明:
a.topic exchange,浏览器输入http://localhost:8082/mq/topic后,topic.#的routing key收到了2条消息,topic.message的routing key收到了1条,可以看出通配符的作用
b.headers exchange:浏览器输入http://localhost:8082/mq/headers,发送了4条消息,但是第1条没有收到。因为headerExchange绑定时使用了whereAll,headerExchange2绑定时使用了whereAny。