AI智能
改变未来

RabbitMQ 3.9.7 镜像模式集群与Springboot 2.5.5 整合

1. 概述

老话说的好:做人要懂得变通,善于思考,有时稍微转个弯,也许问题就解决了。

言归正传,之前我们聊了 RabbitMQ 3.9.7 镜像模式集群的搭建,今天我们来聊聊 RabbitMQ 3.9.7 镜像模式集群与Springboot 2.5.5 整合。

2. 场景说明

服务器A IP:192.168.1.22

服务器B IP:192.168.1.8

服务器C IP:192.168.1.144

此三台服务器上已搭建好了 RabbitMQ镜像模式集群,镜像模式集群的搭建,可参见我的上一篇文章。

3. 与Springboot的整合

3.1 引入依赖

<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.5.5</version><relativePath/> <!-- lookup parent from repository --></parent>

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>

3.2 生产服务配置

spring:rabbitmq:addresses: 192.168.1.22:5672,192.168.1.8:5672,192.168.1.144:5672username: guestpassword: guestvirtual-host: /connection-timeout: 16000# 启用消息确认模式publisher-confirm-type: correlated# 启用 return 消息模式publisher-returns: truetemplate:mandatory: true

3.3 生产服务代码

import org.springframework.amqp.AmqpException;import org.springframework.amqp.core.MessagePostProcessor;import org.springframework.amqp.rabbit.connection.CorrelationData;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.messaging.Message;import org.springframework.messaging.MessageHeaders;import org.springframework.messaging.support.MessageBuilder;import org.springframework.stereotype.Component;import java.util.Map;@Componentpublic class Producer {@Autowiredprivate RabbitTemplate rabbitTemplate;/*** 确认回调*/final RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {// correlationData 唯一标识// ack mq是否收到消息// cause 失败原因System.out.println(\"correlationData:\" + correlationData.getId());System.out.println(\"ack:\" + ack);System.out.println(\"cause:\" + cause);}};/*** 发送消息* @param messageBody   消息体* @param headers       附加属性* @throws Exception*/public void sendMessage(String messageBody, Map<String, Object> headers, String id) throws Exception {MessageHeaders messageHeaders = new MessageHeaders(headers);Message<String> message = MessageBuilder.createMessage(messageBody, messageHeaders);rabbitTemplate.setConfirmCallback(confirmCallback);String exchangeName = \"exchange-hello\";String routingKey = \"test.123\";CorrelationData correlationData = new CorrelationData(id);rabbitTemplate.convertAndSend(exchangeName, routingKey, message, new MessagePostProcessor() {/*** 发送消息后做的事情* @param message* @return* @throws AmqpException*/@Overridepublic org.springframework.amqp.core.Message postProcessMessage(org.springframework.amqp.core.Message message) throws AmqpException {return message;}}, correlationData);}}

3.4 消费服务配置

spring:rabbitmq:addresses: 192.168.1.22:5672,192.168.1.8:5672,192.168.1.144:5672username: guestpassword: guestvirtual-host: /connection-timeout: 16000listener:simple:# 设置为手工ACKacknowledge-mode: manualconcurrency: 5prefetch: 1max-concurrency: 10

3.5 消费服务代码

import com.rabbitmq.client.Channel;import org.springframework.amqp.rabbit.annotation.*;import org.springframework.amqp.support.AmqpHeaders;import org.springframework.messaging.Message;import org.springframework.stereotype.Component;@Componentpublic class Consumer {@RabbitListener(bindings = @QueueBinding(value = @Queue(value = \"queue-hello\", durable = \"true\"),exchange = @Exchange(value = \"exchange-hello\" , durable = \"true\", type = \"topic\"),key = \"test.*\"))@RabbitHandlerpublic void onMessage(Message message, Channel channel) throws Exception {System.out.println(\"收到消息:\" + message.getPayload());Long deliveryTag = (Long)message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);channel.basicAck(deliveryTag, false);}}

3.6 Rest 测试代码

@RestController@RequestMapping(\"/mq\")public class RabbitmqController {@Autowiredprivate Producer producer;@GetMapping(\"/sendMessage\")public String sendMessage(@RequestParam String messageBody, @RequestParam String id) throws Exception {Map<String, Object> headers = new HashMap<>();producer.sendMessage(messageBody, headers, id);return \"success\";}}

4. 综述

今天聊了一下 RabbitMQ 3.9.7 镜像模式集群与Springboot 2.5.5 整合,希望可以对大家的工作有所帮助。

欢迎帮忙点赞、评论、转发、加关注 :)

关注追风人聊Java,每天更新Java干货。

赞(0) 打赏
未经允许不得转载:爱站程序员基地 » RabbitMQ 3.9.7 镜像模式集群与Springboot 2.5.5 整合