SpringBoot整合RabbitMQ实现消息的消息消费

什么是RabbitMQ?

是由erlang语言开发,基于AMQP(Advanced Message Queue 高级消息队列协议)协议实现的消息队列,它是一种应用程序之间的通信方法,消息队列在分布式系统开发中应用非常广泛。支持Windows、Linux/Unix、MAC OS X操作系统和包括JAVA在内的多种编程语言。

组成部分

  • Broker:消息服务进程包含Exchange和Queue两个部分
    • Exchange:消息队列交换机,按一定的规则将消息路由转发到某个队列,对消息进行过虑。
      • Direct Exchange(point-to-point):直接交换器,生产者将消息丢给它后,它从与自己绑定的那些Queue里选择一个,然后直接将消息丢过去,其他的Queue就接不到这个消息了。 
      • Fanout Exchange(multicast):广播交换机,生产者将消息发送给所有的Queue。
      • Topic Exchange(publish-subscribe): 主题交换器,生产者将消息丢给它后,它就给与自己绑定的那些个关心这个消息Queue全部发送一份消息。 也就是发布-订阅模式。
    • Queue:消息队列,存储消息的队列,消息到达队列并转发给指定的
    • RoutingKey路由key:用来控制交换器如何将消息发送给绑定的队列。
  • Producer:消息生产者,即生产方客户端,生产方客户端将消息发送
  • Consumer:消息消费者,即消费方客户端,接收MQ转发的消息。

工作原理

生产者发送消息:

  1. 生产者和Broker进程建立TCP连接
  2. 生产者和Broker进程建立通道
  3. 生产者通过Channel信道将消息发送给Broker,由Exchange将消息进行转发
  4. Exchange将消息转发给指定的Queue

消费者消费消息:

  1. 消费者和Broker进程建立TCP连接
  2. 消费者和Broker进程建立通道
  3. 消费者监听指定的Queue
  4. 当有消息到达Queue时Broker默认将消息推送给消费者

安装

配置RabbitMQ 的系统环境,安装erlang

配置环境变量

ERLANG_HOME  = "安装路径"

path = “%ERLANG_HOME%/bin”

输入erl 检查是否安装成功如下图:

 

安装RabbitMQ

 

安装好以后,配置环境变量

RABBITMQ_HOME = "安装路径"
path = “%RABBITMQ_HOME%/sbin”

运行 cmd 命令 输入 rabbitmq-server 启动成功,如下图:

 

访问 http://localhost:15672

输入用户名和密码,默认都是guest

好了接下来我们开始整合!!!

案例代码

搭建一个SpringBoot项目选择好rabbitmq

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>com.SpringBoot</groupId>
        <artifactId>SpringBoot</artifactId>
        <version>0.0.1-SNAPSHOT</version>
    </parent>

    <groupId>com.demo</groupId>
    <artifactId>SpringBoot_RabbitMQ</artifactId>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
    </dependencies>

</project>

配置 application.yml

 

server:
  port: 8080
spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    virtual-host: /

创建MQ配置类

 

package com.demo.mq.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @Program: SpringBoot
 * @ClassName RabbitMQConfig
 * @Author: liutao
 * @Description: 消息队列配置类
 * @Create: 2023-07-09 19:26
 * @Version 1.0
 **/

@Configuration
public class RabbitMQConfig {
    /**
     * 消息队列名称
     **/
    public static final String QUEEN_NAME = "queue";
    /**
     * 交换机名称
     **/
    public static final String EXCHANGE_NAME = "exchange";
    /**
     * 路由key
     **/
    public static final String ROUTING_KEY = "key";

    /**
     * @MethodName: queue
     * @description: 消息队列
     * @UpdateTime: 2023/7/9 20:20
     * @Return: org.springframework.amqp.core.Queue
     **/
    @Bean
    public Queue queue() {
        return new Queue(QUEEN_NAME);
    }

    /***
     * @MethodName: exchange
     * @description: 交换机
     * @UpdateTime: 2023/7/9 20:19
     * @Return: org.springframework.amqp.core.DirectExchange
     **/

    @Bean
    public DirectExchange exchange() {
        return new DirectExchange(EXCHANGE_NAME);
    }

    /**
     * @MethodName: binding
     * @description: 将消息队列 绑定到交换机
     * @UpdateTime: 2023/7/9 20:18
     * @Return: org.springframework.amqp.core.Binding
     **/
    @Bean
    public Binding binding() {
        return BindingBuilder
                .bind(queue())
                .to(exchange())
                .with(ROUTING_KEY);
    }
}

创建生产者类

 

package com.demo.mq;

import com.demo.mq.config.RabbitMQConfig;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * @Program: SpringBoot
 * @ClassName Producer
 * @Author: liutao
 * @Description: 消息生产者
 * @Create: 2023-07-09 19:51
 * @Version 1.0
 **/

@Component
public class Producer {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    public void sendMessage(String message) {
        try {
            rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, RabbitMQConfig.ROUTING_KEY, message);
        }catch (Exception e) {
            throw new RuntimeException("消息发布异常");
        }
    }
}

创建消费者类

 

package com.demo.mq;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * @Program: SpringBoot
 * @ClassName Consumer
 * @Author: liutao
 * @Description: 消息消费者
 * @Create: 2023-07-09 19:41
 * @Version 1.0
 **/
@Slf4j
@Component
public class Consumer {
    @RabbitListener(queues = "queue")
    public void receiveMessage(String message) {
        // 处理接收到的消息
        log.info("Received message:" + message);
    }
}

web接口

 

package com.demo.mq.controller;

import com.demo.mq.Producer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @Program: SpringBoot
 * @ClassName MqController
 * @Author: liutao
 * @Description: mq调用接口
 * @Create: 2023-07-09 19:58
 * @Version 1.0
 **/

@RestController
public class MqController {
    @Autowired
    private Producer producer;
    @GetMapping("/send")
    public String send(String message){
        producer.sendMessage(message);
        return "ok";
    }
}

测试效果

启动服务 ,连接MQ成功

访问 http://localhost:8080/send?messgae="内容"

后台

一样我们也可以在mq管理面板去测试 找到 Queues 点击queue 进去

 

后台

结尾

到这里我们今天的SpringBoot就学习完了!!see you!!

如果我的内容对你有用,欢迎点赞,关注 !!涛哥博客

阅读剩余
THE END