常见主流消息中间件
常见主流有rabbitmq、rocketmq和kafka。
- rabbitmq
rabbitmq是基于amqp协议实现,服务端使用erlang语言编写,支持java、c、python等多种客户端。在易用性、扩展性、高可用性等方面表现都不错。
缺点:Erlang开发,阅读修改源码难度大 - rocketmq
阿里开源的一款的消息中间件, 纯Java开发,具有高吞吐量、高可用性、适合大规模分布式系统应用的特点, 性能强劲(零拷贝技术),支持海量堆积, 支持指定次数和时间间隔的失败消息重发,支持consumer端tag过滤、延迟消息等,在阿里内部进行大规模使用,适合在电商,互联网金融等领域。
缺点:社区相对不活跃,更新较快 - kafka
是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理大规模的网站中的所有动作流数据(网页浏览,搜索和其他用户的行动),副本集机制,实现数据冗余,保障数据尽量不丢失;支持多个生产者和消费者。擅长大数据的日志处理。
缺点:类似mq,功能较为简单。支持简单的mq功能,不支持批量和广播消息。
本文主要介绍rabbitmq,对于rocketmq和kafka不做过多描述。
欢迎关注个人公众号【好好学技术】交流学习
消息中间件的作用
核心点就三个点:解耦、异步、削峰。
rabbitmq核心概念介绍
- Producter生产者
创建message消息,然后投递到mq中
- Message消息
生产者生产的消费内容,有消息头和消息体,包含多个属性配置,比如RoutingKey路由键等。
- RoutingKey路由键
生产者发送消息到交换器时,需要指定一个RoutingKey,用来指定消息的路由规则。RoutingKey最大长度为255字节。
- Exchange交换器
生产者将消息发送到Exchange,交换器将消息路由到一个或者多个队列中。交换机和队列是多对多的关系。
- Queue队列
用于存储消息。rabbitmq的消息只能存储在队列中。
- Binding绑定
绑定交换器与队列。绑定时会指定一个绑定键,这样rabbitmq就知道如何正确的对消息路由到队列。
- Consumer消费者
消费队列里面的消息。
Exchange交换机
rabbitmq支持四种交换机类型。
Direct exchange、Fanout exchange、Topic exchange、Headers exchange。
- Direct exchange 定向
将队列绑定到交换机上时,要求消息与一个特定的路由键完全匹配。
比如路由键为“ikun”,那么只有标记为“ikun”的消息才会被转发。 - Fanout exchange 广播
广播类型,常做发布订阅。不需要处理路由键。收到消息后发送到所有绑定的队列上。
- Topic exchange 通配
路由时支持“#”或者*来的匹配规则
ikun.# 能够匹配到 ikun.ji, ikun.ji.taimei
ikun.* 能匹配到ikun.ji 但是不能匹配到ikun.ji.taimei - Headers exchange
根据消息内容中的header属性进行匹配。几乎不用。
docker创建rabbitmq
docker run -d -p 5672:5672 -p 15672:15672 --name rabbitmq -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin rabbitmq:management
管控台地址:http://127.0.0.1:15672 admin admin
服务端地址:http://127.0.0.1:5672
springboot整合rabbitmq
pom引入
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
yml增加配置文件
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
virtual-host: /
username: admin
password: admin
创建RabbitmqConfig配置文件
package com.fandf.test.rabbit;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author fandongfeng
* @date 2023/4/15 15:38
*/
@Configuration
public class RabbitMQConfig {
public static final String EXCHANGE_NAME = "ikun_exchange";
public static final String QUEUE_NAME = "ikun_queue";
/**
* 交换机
*/
@Bean
public Exchange ikunExchange() {
//return new TopicExchange(EXCHANGE_NAME, true, false);
return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
}
/**
* 队列
*/
@Bean
public Queue ikunQueue() {
//return new Queue(QUEUE_NAME, true, false, false, null);
return QueueBuilder.durable(QUEUE_NAME).build();
}
/**
* 交换机和队列绑定关系
*/
@Bean
public Binding ikunBinding(Queue queue, Exchange exchange) {
//return new Binding(QUEUE_NAME, Binding.DestinationType.QUEUE, EXCHANGE_NAME, "ikun.#", null);
return BindingBuilder.bind(queue).to(exchange).with("ikun.#").noargs();
}
}
消费者编写
package com.fandf.test.rabbit;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* @author fandongfeng
* @date 2023/4/15 15:42
*/
@Component
@RabbitListener(queues = "ikun_queue")
public class OrderMQListener {
/**
* RabbitHandler 会自动匹配 消息类型(消息自动确认)
* @param msg 消息内容
* @param message mq消息
*/
@RabbitHandler
public void consumer(String msg, Message message) {
long msgTag = message.getMessageProperties().getDeliveryTag();
System.out.println("msg="+msg);
System.out.println("msgTag="+msgTag);
System.out.println("message="+ message);
System.out.println("监听到消息:消息内容:"+message.getBody());
}
}
编写生产者并测试
package com.fandf.test.redis;
import com.fandf.test.rabbit.RabbitMQConfig;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.test.context.SpringBootTest;
import javax.annotation.Resource;
/**
* @author fandongfeng
* @date 2023/4/15 15:46
*/
@SpringBootTest
public class ProductMQTest {
@Resource
RabbitTemplate rabbitTemplate;
@Test
public void test() throws InterruptedException {
rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"ikun.mei", "鸡你太美");
Thread.sleep(10000);
}
}
执行输出
msg=鸡你太美
msgTag=2
message=(Body:'鸡你太美' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=ikun_exchange, receivedRoutingKey=ikun.mei, deliveryTag=2, consumerTag=amq.ctag-0DOQpOmnVs_Ffnf-Os9RLQ, consumerQueue=ikun_queue])
监听到消息:消息内容:[B@60587e5a