1. 概述
老话说的好:做人要懂得变通,善于思考,有时稍微转个弯,也许问题就解决了。
言归正传,之前我们聊了 RabbitMQ 3.9.7 镜像模式集群的搭建,今天我们来聊聊 RabbitMQ 3.9.7 镜像模式集群与Springboot 2.5.5 整合。
2. 场景说明
服务器A IP:192.168.1.22
服务器B IP:192.168.1.8
服务器C IP:192.168.1.144
此三台服务器上已搭建好了 RabbitMQ镜像模式集群,镜像模式集群的搭建,可参见我的上一篇文章。
3. 与Springboot的整合
3.1 引入依赖
<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.5.5</version><relativePath/> <!-- lookup parent from repository --></parent>
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
3.2 生产服务配置
spring:rabbitmq:addresses: 192.168.1.22:5672,192.168.1.8:5672,192.168.1.144:5672username: guestpassword: guestvirtual-host: /connection-timeout: 16000# 启用消息确认模式publisher-confirm-type: correlated# 启用 return 消息模式publisher-returns: truetemplate:mandatory: true
3.3 生产服务代码
import org.springframework.amqp.AmqpException;import org.springframework.amqp.core.MessagePostProcessor;import org.springframework.amqp.rabbit.connection.CorrelationData;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.messaging.Message;import org.springframework.messaging.MessageHeaders;import org.springframework.messaging.support.MessageBuilder;import org.springframework.stereotype.Component;import java.util.Map;@Componentpublic class Producer {@Autowiredprivate RabbitTemplate rabbitTemplate;/*** 确认回调*/final RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {// correlationData 唯一标识// ack mq是否收到消息// cause 失败原因System.out.println(\"correlationData:\" + correlationData.getId());System.out.println(\"ack:\" + ack);System.out.println(\"cause:\" + cause);}};/*** 发送消息* @param messageBody 消息体* @param headers 附加属性* @throws Exception*/public void sendMessage(String messageBody, Map<String, Object> headers, String id) throws Exception {MessageHeaders messageHeaders = new MessageHeaders(headers);Message<String> message = MessageBuilder.createMessage(messageBody, messageHeaders);rabbitTemplate.setConfirmCallback(confirmCallback);String exchangeName = \"exchange-hello\";String routingKey = \"test.123\";CorrelationData correlationData = new CorrelationData(id);rabbitTemplate.convertAndSend(exchangeName, routingKey, message, new MessagePostProcessor() {/*** 发送消息后做的事情* @param message* @return* @throws AmqpException*/@Overridepublic org.springframework.amqp.core.Message postProcessMessage(org.springframework.amqp.core.Message message) throws AmqpException {return message;}}, correlationData);}}
3.4 消费服务配置
spring:rabbitmq:addresses: 192.168.1.22:5672,192.168.1.8:5672,192.168.1.144:5672username: guestpassword: guestvirtual-host: /connection-timeout: 16000listener:simple:# 设置为手工ACKacknowledge-mode: manualconcurrency: 5prefetch: 1max-concurrency: 10
3.5 消费服务代码
import com.rabbitmq.client.Channel;import org.springframework.amqp.rabbit.annotation.*;import org.springframework.amqp.support.AmqpHeaders;import org.springframework.messaging.Message;import org.springframework.stereotype.Component;@Componentpublic class Consumer {@RabbitListener(bindings = @QueueBinding(value = @Queue(value = \"queue-hello\", durable = \"true\"),exchange = @Exchange(value = \"exchange-hello\" , durable = \"true\", type = \"topic\"),key = \"test.*\"))@RabbitHandlerpublic void onMessage(Message message, Channel channel) throws Exception {System.out.println(\"收到消息:\" + message.getPayload());Long deliveryTag = (Long)message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);channel.basicAck(deliveryTag, false);}}
3.6 Rest 测试代码
@RestController@RequestMapping(\"/mq\")public class RabbitmqController {@Autowiredprivate Producer producer;@GetMapping(\"/sendMessage\")public String sendMessage(@RequestParam String messageBody, @RequestParam String id) throws Exception {Map<String, Object> headers = new HashMap<>();producer.sendMessage(messageBody, headers, id);return \"success\";}}
4. 综述
今天聊了一下 RabbitMQ 3.9.7 镜像模式集群与Springboot 2.5.5 整合,希望可以对大家的工作有所帮助。
欢迎帮忙点赞、评论、转发、加关注 :)
关注追风人聊Java,每天更新Java干货。