Rabbitmq入门

Posted on 2023-04-13

常见主流消息中间件

常见主流有rabbitmq、rocketmq和kafka。

  • rabbitmq

    rabbitmq是基于amqp协议实现,服务端使用erlang语言编写,支持java、c、python等多种客户端。在易用性、扩展性、高可用性等方面表现都不错。
    缺点:Erlang开发,阅读修改源码难度大

  • rocketmq

    阿里开源的一款的消息中间件, 纯Java开发,具有高吞吐量、高可用性、适合大规模分布式系统应用的特点, 性能强劲(零拷贝技术),支持海量堆积, 支持指定次数和时间间隔的失败消息重发,支持consumer端tag过滤、延迟消息等,在阿里内部进行大规模使用,适合在电商,互联网金融等领域。
    缺点:社区相对不活跃,更新较快

  • kafka

    是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理大规模的网站中的所有动作流数据(网页浏览,搜索和其他用户的行动),副本集机制,实现数据冗余,保障数据尽量不丢失;支持多个生产者和消费者。擅长大数据的日志处理。
    缺点:类似mq,功能较为简单。支持简单的mq功能,不支持批量和广播消息。

本文主要介绍rabbitmq,对于rocketmq和kafka不做过多描述。

欢迎关注个人公众号【好好学技术】交流学习

消息中间件的作用

核心点就三个点:解耦、异步、削峰。

rabbitmq核心概念介绍

image.png

  • Producter生产者

    创建message消息,然后投递到mq中

  • Message消息

    生产者生产的消费内容,有消息头和消息体,包含多个属性配置,比如RoutingKey路由键等。

  • RoutingKey路由键

    生产者发送消息到交换器时,需要指定一个RoutingKey,用来指定消息的路由规则。RoutingKey最大长度为255字节。

  • Exchange交换器

    生产者将消息发送到Exchange,交换器将消息路由到一个或者多个队列中。交换机和队列是多对多的关系。

  • Queue队列

    用于存储消息。rabbitmq的消息只能存储在队列中。

  • Binding绑定

    绑定交换器与队列。绑定时会指定一个绑定键,这样rabbitmq就知道如何正确的对消息路由到队列。

  • Consumer消费者

    消费队列里面的消息。

Exchange交换机

rabbitmq支持四种交换机类型。
Direct exchange、Fanout exchange、Topic exchange、Headers exchange。

  • Direct exchange 定向

    将队列绑定到交换机上时,要求消息与一个特定的路由键完全匹配。
    比如路由键为“ikun”,那么只有标记为“ikun”的消息才会被转发。

  • Fanout exchange 广播

    广播类型,常做发布订阅。不需要处理路由键。收到消息后发送到所有绑定的队列上。

  • Topic exchange 通配

    路由时支持“#”或者*来的匹配规则
    ikun.# 能够匹配到 ikun.ji, ikun.ji.taimei
    ikun.* 能匹配到ikun.ji 但是不能匹配到ikun.ji.taimei

  • Headers exchange

    根据消息内容中的header属性进行匹配。几乎不用。

docker创建rabbitmq

docker run -d -p 5672:5672 -p 15672:15672 --name rabbitmq -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin rabbitmq:management

管控台地址:http://127.0.0.1:15672 admin admin
服务端地址:http://127.0.0.1:5672

springboot整合rabbitmq

pom引入

<dependency>  
    <groupId>org.springframework.boot</groupId>  
    <artifactId>spring-boot-starter-amqp</artifactId>  
</dependency>

yml增加配置文件

spring:  
  rabbitmq:  
    host: 127.0.0.1  
    port: 5672  
    virtual-host: /  
    username: admin  
    password: admin

创建RabbitmqConfig配置文件

package com.fandf.test.rabbit;  
  
import org.springframework.amqp.core.*;  
import org.springframework.context.annotation.Bean;  
import org.springframework.context.annotation.Configuration;  
  
/**  
* @author fandongfeng  
* @date 2023/4/15 15:38  
*/  
@Configuration  
public class RabbitMQConfig {  
  
    public static final String EXCHANGE_NAME = "ikun_exchange";  
    public static final String QUEUE_NAME = "ikun_queue";  
  
    /**  
    * 交换机  
    */  
    @Bean  
    public Exchange ikunExchange() {  
        //return new TopicExchange(EXCHANGE_NAME, true, false);  
        return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();  
    }  
    /**  
    * 队列  
    */  
    @Bean  
    public Queue ikunQueue() {  
        //return new Queue(QUEUE_NAME, true, false, false, null);  
        return QueueBuilder.durable(QUEUE_NAME).build();  
    }  

    /**  
    * 交换机和队列绑定关系  
    */  
    @Bean  
    public Binding ikunBinding(Queue queue, Exchange exchange) {  
        //return new Binding(QUEUE_NAME, Binding.DestinationType.QUEUE, EXCHANGE_NAME, "ikun.#", null);  
        return BindingBuilder.bind(queue).to(exchange).with("ikun.#").noargs();  
    }  
  
}

消费者编写


package com.fandf.test.rabbit;  
  
import org.springframework.amqp.core.Message;  
import org.springframework.amqp.rabbit.annotation.RabbitHandler;  
import org.springframework.amqp.rabbit.annotation.RabbitListener;  
import org.springframework.stereotype.Component;  
  
/**  
* @author fandongfeng  
* @date 2023/4/15 15:42  
*/  
@Component  
@RabbitListener(queues = "ikun_queue")  
public class OrderMQListener {  
  
    /**  
    * RabbitHandler 会自动匹配 消息类型(消息自动确认)  
    * @param msg 消息内容  
    * @param message mq消息  
    */  
    @RabbitHandler  
    public void consumer(String msg, Message message) {  
        long msgTag = message.getMessageProperties().getDeliveryTag();  
        System.out.println("msg="+msg);  
        System.out.println("msgTag="+msgTag);  
        System.out.println("message="+ message);  
        System.out.println("监听到消息:消息内容:"+message.getBody());  
    }  
  
}

编写生产者并测试


package com.fandf.test.redis;  
  
import com.fandf.test.rabbit.RabbitMQConfig;  
import org.junit.jupiter.api.Test;  
import org.springframework.amqp.rabbit.core.RabbitTemplate;  
import org.springframework.boot.test.context.SpringBootTest;  
  
import javax.annotation.Resource;  
  
/**  
* @author fandongfeng  
* @date 2023/4/15 15:46  
*/  
@SpringBootTest  
public class ProductMQTest {  
  
    @Resource  
    RabbitTemplate rabbitTemplate;  

    @Test  
    public void test() throws InterruptedException {  
        rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"ikun.mei", "鸡你太美");  
        Thread.sleep(10000);  
    }  
  
}

执行输出

msg=鸡你太美
msgTag=2
message=(Body:'鸡你太美' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=ikun_exchange, receivedRoutingKey=ikun.mei, deliveryTag=2, consumerTag=amq.ctag-0DOQpOmnVs_Ffnf-Os9RLQ, consumerQueue=ikun_queue])
监听到消息消息内容:[B@60587e5a