AI智能
改变未来

springboot研究五:springboot整合rabbitmq

rabbitmq是当下非常流行的消息队列,本文主要介绍springboot中如何配置使用rabbitmq。

文中代码基于springboot2.1.6,源代码见文末地址。

1.为了自己玩方便,可以用docker安装rabbitmq,见专栏内文章

《docker安装rabbitmq》

2.相关配置

spring.rabbitmq.host=192.168.59.128spring.rabbitmq.port=5672spring.rabbitmq.username=adminspring.rabbitmq.password=admin#这个如果不配置,就会默认找\"/\"spring.rabbitmq.virtual-host=my_vhost#指定心跳超时,单位秒,0为不指定;默认60sspring.rabbitmq.requested-heartbeat=20#是否启用【发布确认】spring.rabbitmq.publisher-confirms=true#是否启用【发布返回】spring.rabbitmq.publisher-returns=true#连接超时,单位毫秒,0表示无穷大,不超时spring.rabbitmq.connection-timeout=10

3.pom依赖

<dependency>    <groupId>org.springframework.boot</groupId>    <artifactId>spring-boot-starter-amqp</artifactId></dependency>

4.rabbitmq有4种exchange

Exchange type Default pre-declared names
Direct exchange (Empty string) and amq.direct
Fanout exchange amq.fanout
Topic exchange amq.topic
Headers exchange amq.match (and amq.headers in RabbitMQ)

a.direct exchange使用routing key进行消息传输,如下图,routing key其实就是queue和exchange的绑定。适用于多工作者协同工作的场景。

绑定代码如下:代码中queue名称和routing key名称都是\”direct\”

@Configurationpublic class DirectRabbitConfig {        @Bean    public Queue direct() {        return new Queue(\"direct\");    }    @Bean    public DirectExchange directExchange() {        return new DirectExchange(\"directExchange\");    }    @Bean    public Binding directBindingExchange(Queue direct, DirectExchange directExchange) {        return BindingBuilder.bind(direct).to(directExchange).with(\"direct\");    }}

sender如下:

@Servicepublic class DirectSenderService {    private Logger logger = LoggerFactory.getLogger(getClass());    @Resource    private AmqpTemplate rabbitTemplate;    public void sendString(String message) {        logger.info(\"direct sender : \" + message);        rabbitTemplate.convertAndSend(\"directExchange\", \"direct\", message);    }    public void sendObject(Object message) {        String messageStr = JSONObject.toJSONString(message);        logger.info(messageStr);        rabbitTemplate.convertAndSend(\"directExchange\", \"direct\", messageStr);    }}

 

receiver:

@RabbitHandler@RabbitListener(queues = {\"direct\"})public void processDirect(Message message) {    logger.info(\"Receiver direct: {}\", new String(message.getBody()));}

 

b.fanout exchange就是广播模式,把消息路有给所有的绑定队列,可以适用于群聊天的场景。

配置代码如下:其中有3个队列绑定一个fanout exchange

@Configurationpublic class FanoutRabbitConfig {    @Bean    public Queue queueA(){        return new Queue(\"fanout.a\");    }    @Bean    public Queue queueB(){        return new Queue(\"fanout.b\");    }    @Bean    public Queue queueC(){        return new Queue(\"fanout.c\");    }    @Bean    public FanoutExchange fanoutExchange() {        return new FanoutExchange(\"fanoutExchange\");    }    @Bean    public Binding bindingExchangeA(Queue queueA, FanoutExchange fanoutExchange) {        return BindingBuilder.bind(queueA).to(fanoutExchange);    }    @Bean    public Binding bindingExchangeB(Queue queueB, FanoutExchange fanoutExchange) {        return BindingBuilder.bind(queueB).to(fanoutExchange);    }    @Bean    public Binding bindingExchangeC(Queue queueC, FanoutExchange fanoutExchange) {        return BindingBuilder.bind(queueC).to(fanoutExchange);    }}

 

sender:

@Servicepublic class FanoutSenderService {    private Logger logger = LoggerFactory.getLogger(getClass());    @Autowired    private AmqpTemplate rabbitTemplate;    public void send(String message) {        logger.info(\"fanout sender : {}\", message);        rabbitTemplate.convertAndSend(\"fanoutExchange\",\"\", message);    }}

receiver:

    @RabbitHandler    @RabbitListener(queues = {\"fanout.a\", \"fanout.b\", \"fanout.c\"})    public void processFanout1(Message message) {        logger.info(\"Receiver fanout: {}\", new String(message.getBody()));    }

c.topic exchange通过routing key和通配符来路由消息,适用于发布订阅场景。

配置代码:

@Configurationpublic class TopicRabbitConfig {    @Bean    public Queue queueMessage() {        return new Queue(\"topic.message\");    }    @Bean    public Queue queueMessage2() {        return new Queue(\"topic.message2\");    }    /**     * 将队列绑定到Topic交换器     * @return     */    @Bean    public TopicExchange exchange() {        return new TopicExchange(\"topicExchange\");    }    /**     * 将队列绑定到Topic交换器     * @param queueMessage     * @param exchange     * @return     */    @Bean    public Binding bindingExchangeMessage(Queue queueMessage, TopicExchange exchange) {        return BindingBuilder.bind(queueMessage).to(exchange).with(\"topic.message\");    }    /**     * 将队列绑定到Topic交换器 采用#的方式     * @param exchange     * @param queueMessage2     * @return     */    @Bean    Binding bindingExchangeMessage2(TopicExchange exchange, Queue queueMessage2) {        return BindingBuilder.bind(queueMessage2).to(exchange).with(\"topic.#\");    }}

sender:

@Servicepublic class TopicSenderService {    private Logger logger = LoggerFactory.getLogger(getClass());    @Resource    private AmqpTemplate rabbitTemplate;    public void send1(String message) {        logger.info(\"topic sender1 : \" + message);        rabbitTemplate.convertAndSend(\"topicExchange\", \"topic.message\", message);    }    public void send2(String message) {        logger.info(\"topic sender2 : \" + message);        rabbitTemplate.convertAndSend(\"topicExchange\", \"topic.message2\", message);    }}

接受

@RabbitHandler    @RabbitListener(queues = {\"topic.message\"})    public void processTopic(Message message) {        logger.info(\"Receiver topic: {}\", new String(message.getBody()));    }    @RabbitHandler    @RabbitListener(queues = {\"topic.message2\"})    public void processTopic2(Message message) {        logger.info(\"Receiver topic2: {}\", new String(message.getBody()));}

 

d.header exchange忽略routing key参数,用header来取代

配置

@Configurationpublic class HeadersRabbitConfig {    @Bean    public Queue headerQueue() {        return new Queue(\"headerQueue\");    }    @Bean    public Queue headerQueue2() {        return new Queue(\"headerQueue2\");    }    @Bean    public HeadersExchange headerExchange() {          return new HeadersExchange(\"headerExchange\");    }    @Bean    public HeadersExchange headerExchange2() {          return new HeadersExchange(\"headerExchange2\");    }     @Bean    public Binding bindingExchange(Queue headerQueue, HeadersExchange headerExchange) {        Map<String,Object> headerValues = new HashMap<>(3);        headerValues.put(\"param1\", \"value1\");        headerValues.put(\"param2\", \"value2\");        return BindingBuilder.bind(headerQueue).to(headerExchange).whereAll(headerValues).match();    }    @Bean    public Binding bindingExchange2(Queue headerQueue2, HeadersExchange headerExchange2) {        Map<String,Object> header = new HashMap<>(3);        header.put(\"param1\", \"value1\");        header.put(\"param2\", \"value2\");        return BindingBuilder.bind(headerQueue2).to(headerExchange2).whereAny(header).match();    }}

发送:

@Servicepublic class HeadersSenderService {    private Logger logger = LoggerFactory.getLogger(getClass());    @Resource    private AmqpTemplate rabbitTemplate;    public void headerSend(Map<String, Object> head, String msg){        logger.info(\"header send message: \"+msg);        rabbitTemplate.convertAndSend(\"headerExchange\", \"headerQueue\", getMessage(head, msg));    }    public void headerSend2(Map<String, Object> head, String msg){        logger.info(\"header1 send message: \"+msg);        rabbitTemplate.convertAndSend(\"headerExchange2\", \"headerQueue2\", getMessage(head, msg));    }    private Message getMessage(Map<String, Object> head, Object msg){        MessageProperties messageProperties = new MessageProperties();        for (Map.Entry<String, Object> entry : head.entrySet()) {            messageProperties.setHeader(entry.getKey(), entry.getValue());        }        MessageConverter messageConverter = new SimpleMessageConverter();        return messageConverter.toMessage(msg, messageProperties);    }}

接收:

@RabbitHandler    @RabbitListener(queues = {\"headerQueue\"})    public void processHeaders(Message message) {        logger.info(\"Receiver header: {}\", new String(message.getBody()));    }    @RabbitHandler    @RabbitListener(queues = {\"headerQueue2\"})    public void processHeaders1(Message message) {        logger.info(\"Receiver header2: {}\", new String(message.getBody()));    }

5.测试,测试代码写在RabbitMqController中,启动Application即可进行url测试。见源码。

说明:

a.topic exchange,浏览器输入http://localhost:8082/mq/topic后,topic.#的routing key收到了2条消息,topic.message的routing key收到了1条,可以看出通配符的作用

b.headers exchange:浏览器输入http://localhost:8082/mq/headers,发送了4条消息,但是第1条没有收到。因为headerExchange绑定时使用了whereAll,headerExchange2绑定时使用了whereAny。

 

赞(0) 打赏
未经允许不得转载:爱站程序员基地 » springboot研究五:springboot整合rabbitmq