消息中间件


消息中间件

RabbitMq ActiveMQ RocketMQ kafka

几个消息中间件的对比

安装RabbitMQ

(33条消息) RabbitMQ安装教程(超详细)_凡尘-追梦者的博客-CSDN博客

安装成功截图

为什么要使用消息队列

为什么使用消息队列? (gitee.io)

解耦

异步

削峰

缺点

系统可用性降低

系统复杂度提高

一致性问题

RabbitMQ demo

1、rabbitmq创建vhost

在 RabbitMQ 中,可以通过管理界面或者命令行方式创建 vhost(虚拟主机)。下面分别介绍两种方式:

  1. 通过 RabbitMQ 管理界面创建 vhost:

    • 打开浏览器,访问 RabbitMQ 管理界面(默认地址:http://localhost:15672/)。
    • 使用管理员账号登录(默认用户名:guest,密码:guest)。
    • 在左侧导航栏中选择 “Admin”(管理员菜单)。
    • 在 “Add a new virtual host”(添加新的虚拟主机)部分,输入要创建的 vhost 名称,然后点击 “Add virtual host” 按钮即可。
  2. 通过命令行方式创建 vhost:

    使用 RabbitMQ 的 rabbitmqctl 命令可以进行管理操作。在命令行中运行以下命令来创建 vhost:

    bashCopy code# 以管理员权限登录 RabbitMQ
    sudo rabbitmqctl add_vhost <vhost_name>

    注意将 <vhost_name> 替换为你要创建的 vhost 名称。

无论哪种方式创建 vhost,都可以在 RabbitMQ 中为不同应用程序或服务创建隔离的消息队列环境。确保为每个应用程序分配不同的 vhost,以便更好地管理和维护消息队列。

2、生产者和消费者demo

在参数设置的时候虚拟机是不需要”/“

ProducerHelloworld.java

public static void main(String[] args) throws IOException, TimeoutException {
    // 1、创建一个连接工厂
    ConnectionFactory factory=new ConnectionFactory();
    // 2、设置参数
    factory.setHost("101.200.40.242");//主机ip
    factory.setPort(5672);//端口号
    factory.setVirtualHost("lyz");//设置虚拟机(可以理解为分组)
    factory.setUsername("root");
    factory.setPassword("root");
    //3、根据配置信息 创建连接
    Connection connection = factory.newConnection();
    //4、创建channel
    Channel channel = connection.createChannel();
    //5、创建队列
    /*
     * 参数:
     *             1. queue:队列名称
     *             2. durable:是否持久化,true  当mq重启之后,还在
     *             3. exclusive:
     *                 * 是否独占。只能有一个消费者监听这队列
     *                 * 当Connection关闭时,是否删除队列
     *                 *
     *             4. autoDelete:是否自动删除。当没有Consumer时,自动删除掉
     *             5. arguments:参数。
     */
    channel.queueDeclare("queueHelloworld",true,false,false,null);

    // 6、发送消息
    String body="hello 07201111";
    /*
    basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
    参数:
        1. exchange:交换机名称。简单模式下交换机会使用默认的 ""
        2. routingKey:路由名称
        3. props:配置信息
        4. body:发送消息数据
     */
    channel.basicPublish("","queueHelloworld",null,body.getBytes());

    //7.释放资源
    channel.close();
    connection.close();
}

ConsumerHelloworld.java

public static void main(String[] args) throws IOException, TimeoutException {
    //创建连接工厂
    ConnectionFactory factory = new ConnectionFactory();
    // 设置参数
    factory.setHost("101.200.40.242");
    factory.setPort(5672);
    factory.setUsername("root");
    factory.setPassword("root");
    factory.setVirtualHost("lyz");
    // 创建连接 Connection
    Connection connection = factory.newConnection();
    // 创建channel
    Channel channel = connection.createChannel();
    //创建队列
    channel.queueDeclare("queueHelloworld",true,false,false,null);
    //接受消息
    DefaultConsumer consumer = new DefaultConsumer(channel){
        @Override
        public void handleDelivery(String consumerTag,
                                   Envelope envelope,
                                   AMQP.BasicProperties properties,
                                   byte[] body) throws IOException {
            System.out.println("consumerTag = " + consumerTag);
            System.out.println("envelope.getExchange() = " + envelope.getExchange());
            System.out.println("envelope.getRoutingKey() = " + envelope.getRoutingKey());
            System.out.println("properties = " + properties);
            System.out.println("new String(body) = " + new String(body));
        }
    };
    //参数3 使用回调对象的对调函数 展示消息
    channel.basicConsume("queueHelloworld",true,consumer);


}

Springboot整合RabbitMQ

简单测试

配置类

@Configuration
public class RabbitMQConfig {
    public static final String EXCHANGE_NAME="boot_topic_exchange";
    public static final String QUEUE_NAME="boot_queue";
    @Bean
    public Queue queue(){
        return new Queue("queue",true);
    }
}

发送请求

@Service
@Slf4j
public class MQSender {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void send(Object message){
        log.info("发送消息:"+message);
        rabbitTemplate.convertAndSend("queue",message);
    }
}

接收消息

@Service
@Slf4j
public class MQReceiver {

    @RabbitListener(queues = "queue")
    public void receive(Object msg){
        log.info("接受消息:"+msg);
    }
}

fanout广播模式

这个模式将消息广播给所有与交换机绑定的队列。路由键在这里不起作用,只需要绑定队列到交换机即可,所有绑定的队列都会收到相同的消息。

配置类

@Configuration
public class RabbitMQConfig {
    private static final  String QUEUE1="queue_fanout01";
    private static final  String QUEUE2="queue_fanout02";
    private static final  String EXCHANGE="fanoutExchange";
    @Bean
    public Queue queue(){
        return new Queue("queue",true);
    }
    @Bean
    public Queue queue1(){
        return new Queue(QUEUE1);
    }
    @Bean
    public Queue queue2(){
        return new Queue(QUEUE2);
    }
    @Bean
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange(EXCHANGE);
    }
    @Bean
    public Binding binding1(){
        return BindingBuilder.bind(queue1()).to(fanoutExchange());
    }
    @Bean
    public Binding binding2(){
        return BindingBuilder.bind(queue2()).to(fanoutExchange());
    }
}

发送信息

@Service
@Slf4j
public class MQSender {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void send(Object message){
        log.info("发送消息:"+message);
        rabbitTemplate.convertAndSend("fanoutExchange","",message);
    }
}

接受消息

@Service
@Slf4j
public class MQReceiver {

    @RabbitListener(queues = "queue")
    public void receive(Object msg){
        log.info("接受消息:"+msg);
    }
    @RabbitListener(queues = "queue_fanout01")
    public void receive01(Object msg){
        log.info("QUEUE01_接受消息:"+msg);
    }
    @RabbitListener(queues = "queue_fanout02")
    public void receive02(Object msg){
        log.info("QUEUE_02接受消息:"+msg);
    }
}

测试

image-20230810103704342

direct模式

路由模式

生产者将消息发送到direct交换器,在绑定队列和交换器的时候有一个路由key,生产者发送的消息会指定一个路由key,那么消息只会发送到相应key相同的队列,接着监听该队列的消费者消费消息。

也就是让消费者有选择性的接收消息。
路由模式,是以路由规则为导向,引导消息存入符合规则的队列中。再由队列的消费者进行消费的。

配置类

@Configuration
public class RabbitMQConfig {
    private static final String QUEUE1="queue_direct01";
    private static final String QUEUE2="queue_direct02";
    private static final String EXCHANGE="directExchange";
    private static final String ROUNTINGKEY01="queue.red";
    private static final String ROUNTINGKEY02="queue.green";
    @Bean
    public Queue queue1(){
        return new Queue(QUEUE1);
    }
    @Bean
    public Queue queue2(){
        return new Queue(QUEUE2);
    }
    @Bean
    public DirectExchange directExchange(){
        return new DirectExchange(EXCHANGE);
    }
    @Bean
    public Binding binding01(){
        return BindingBuilder.bind(queue1()).to(directExchange()).with(ROUNTINGKEY01);
    }
    @Bean
    public Binding binding02(){
        return BindingBuilder.bind(queue2()).to(directExchange()).with(ROUNTINGKEY02);
    }
}

消息发送者

@Service
@Slf4j
public class MQSender {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    public void send(Object message){
        log.info("发送消息:"+message);
        rabbitTemplate.convertAndSend("fanoutExchange","",message);
    }

    public void send01(Object message){
        log.info("发送消息:"+message);
        rabbitTemplate.convertAndSend("directExchange","queue.red",message);
    }
    public void send02(Object message){
        log.info("发送消息:"+message);
        rabbitTemplate.convertAndSend("directExchange","queue.green",message);
    }
}

消息接收者

@Service
@Slf4j
public class MQReceiver {
    @RabbitListener(queues = "queue_direct01")
    public void receive01(Object msg){
        log.info("QUEUE01_接受消息:"+msg);
    }
    @RabbitListener(queues = "queue_direct02")
    public void receive02(Object msg){
        log.info("QUEUE_02接受消息:"+msg);
    }
}

测试

image-20230810104720459

Topic模式

上面的路由模式是根据路由key进行完整的匹配(完全相等才发送消息),这里的通配符模式通俗的来讲就是模糊匹配。

符号“#”表示匹配一个或多个词,符号“*”表示匹配一个词。
  与路由模式相似,但是,主题模式是一种模糊的匹配方式

配置类

@Configuration
public class RabbitMQConfig {
	private static final String QUEUE01="queue_topic01";
    private static final String QUEUE02="queue_topic02";
    private static final String EXCHANGE="topicExchange";
    private static final String ROUNTINGKEY01="#.queue.#";
    private static final String ROUNTINGKEY02="*.queue.#";
    @Bean
    public Queue queue1(){
        return new Queue(QUEUE01);
    }
    @Bean
    public Queue queue2(){
        return new Queue(QUEUE02);
    }
    @Bean
    public TopicExchange topicExchange(){
        return new TopicExchange(EXCHANGE);
    }
    @Bean
    public Binding binding01(){
        return BindingBuilder.bind(queue1()).to(topicExchange()).with(ROUNTINGKEY01);
    }
    @Bean
    public Binding binding02(){
        return BindingBuilder.bind(queue2()).to(topicExchange()).with(ROUNTINGKEY02);
    }
}

生产者

@Service
@Slf4j
public class MQSender {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    public void send03(Object msg){
        log.info("发送消息queue01接受:"+msg);
        rabbitTemplate.convertAndSend("topicExchange","queue.red.message",msg);
    }
    public void send04(Object msg){
        log.info("发送消息queue01和queue02接受:"+msg);
        rabbitTemplate.convertAndSend("topicExchange","message.queue.green",msg);
    }
}

消费者

@Service
@Slf4j
public class MQReceiver {
    @RabbitListener(queues="queue_topic01")
    public void receive05(Object msg){
        log.info("queue01接受消息"+msg);
    }
    @RabbitListener(queues="queue_topic02")
    public void receive06(Object msg){
        log.info("queue02接受消息"+msg);
    }
}

测试

@Test
public void testTopic(){
    mqSender.send03("发消息喽03");
}
@Test
public void testTopic02(){
    mqSender.send04("发消息喽04");
}

image-20230810111949491

测试2

@Test
public void testTopic(){
    mqSender.send03("发消息喽03");
    mqSender.send04("发消息喽04");
}

image-20230810112330918

发现再不同时发送的时候,两个接受队列都可以接收到消息04;

但是为什么在同一个方法中,两个接受队列不能同时接受消息04;


文章作者: 毛豆不逗比
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 毛豆不逗比 !
  目录
{% include '_third-party/exturl.swig' %} {% include '_third-party/bookmark.swig' %} {% include '_third-party/copy-code.swig' %} + {% include '_custom/custom.swig' %}