一、安装
RabbitMQ 的架构模型可以分为客户端和服务端两部分,客户端包括生产者和消费者,服务端包括虚拟主机、交换器和队列。
整体的流程非常简单,生产者将消息发送到服务端,消费者从服务端获取对应的消息。
生产者在发送消息前需要先确认发送给哪个虚拟主机的哪个交换器,再由交换器通过路由键将消息转发给与之绑定的队列。
最后,消费者到指定的队列中获取自己的消息进行消费。
安装地址:RabbitMQ官网下载地址
RabbitMQ 依赖于 erlang(Erlang 是一种多用途编程语言,主要用于开发并发和分布式系统),需要先行下载erlang。
Erlang安装下载:Erlang官网下载地址
安装完成后,windows默认是在C:\Program Files\RabbitMQ Server\rabbitmq_server-3.10.7\sbin
目录下启动
1rabbitmgctl status //查看当前状态2rabbitmq-plugins enable rabbitmq_management //开启Web插件3rabbitmg-server start //启动服务4rabbitmq-server stop //停止服务5rabbitmq-server restart//重启服务
在浏览器访问地址:localhost:15672即可访问,默认用户名和密码都是guest
二、基础概念
1、客户端
生产者和消费者都属于客户端。
- 生产者:消息的发送方,将要发送的消息封装成一定的格式,发送给服务端。消息包括消息体和标签。
- 消费者:消息的接收方,负责消费消息体。
2、服务端
虚拟主机、交换机、队列都属于服务端。
- 虚拟主机:用来对交换器和队列进行逻辑隔离,在同一个虚拟主机下,交换器和队列的名称不能重复。有点类似 Java 中的包,同一个包下,不能有相同名称的类或者接口。
- 交换器:负责接收生产者发来的消息,并根据规则分配给对应的队列,不生产消息,只是消息的搬运工。
- 队列:负责存储消息,生产者发送的消息会放在这里,消费者从这里取。
3、连接和信道
连接和信道是两个不同的概念,连接的英文叫 connection,信道叫 channel。
连接里包含了多条信道,连接用的是 TCP 连接,因为 AMQP 就是用 TCP 实现的。
为什么不直接使用连接,而要在连接的基础上新建信道呢?
因为 TCP 连接是比较昂贵的,新建需要三次握手,销毁需要四次挥手,所以如果每个线程在想 RabbitMQ 服务端发送/接收消息的时候都新建一个 TCP 连接,就会非常的消耗资源,于是就有了信道。
信道是线程私有的,连接是线程共享的。
信道+连接的模式,既保证了线程之间的私密性,又减少了系统开销。
4、业务场景
消息队列的主要功能有三种:
- 异步处理:比如说在做电商业务的时候,提交订单的动作可能涉及到创建订单、扣除库存、增加用户积分、发送订单邮件等。它们并不是一个串行的操作,可以把发送订单邮件和增加用户积分交给消息队列去做。
- 系统解耦:消息队列可以作为不同系统之间的桥梁,且不受系统技术栈的约束。
- 缓冲削峰:消息队列可以将大量的请求放到队列中,然后再按照一定的顺序规则交给业务服务器处理。
5、工作模式
1、头路由模式---headers
-
headers 模式/headers 头路由模式使用比较少
-
headers 交换机是一种比较复杂且少见的交换机,不同于 direct 和 topic,它不关心 路由 key 是否匹配,而只关心 header 中的 key-value 对是否匹配(这里的匹配为精确匹配, 包含键和值都必须匹配), 有点类似于 http 中的请求头。
-
headers 头路由模型中,消息是根据 prop 即请求头中 key-value 来匹配的。
-
绑定的队列(也可以理解成消费方) 指定的 headers 中必须包含一个”x-match”的键
-
键”x-match”的值有 2 个:all 和 any。
all:表示绑定的队列/消费方 指定的所有 key-value 都必须在消息 header 中出现并匹配
any:表示绑定的队列/消费方 指定的 key-value 至少有一个在消息 header 中出现并匹配即可
配置队列和交换机:
1@Configuration2public class RabbitMQConfig {3 private static final String QUEUE01 = "queue_header01";4 private static final String QUEUE02 = "queue_header02";5 private static final String EXCHANGE = "headersExchange";6
7 @Bean8 public Queue queue_header01(){9 return new Queue(QUEUE01);10 }11
12 @Bean13 public Queue queue_header02(){14 return new Queue(QUEUE02);15 }26 collapsed lines
16
17 //创建交换机--HeadersExchange18 @Bean19 public HeadersExchange headersExchange() {20 return new HeadersExchange(EXCHANGE);21 }22
23 //队列交换机绑定,声明要匹配的k-v,指定匹配方式:all/any24 @Bean25 public Binding binding_header01() {26 Map<String, Object> map = new HashMap<>();27 map.put("color", "red");28 map.put("speed", "slow");29 return BindingBuilder.bind(queue_header01()).to(headersExchange())30 .whereAny(map).match(); //以any方式匹配31 }32
33 @Bean34 public Binding binding_header02() {35 Map<String, Object> map = new HashMap<>();36 map.put("color", "red");37 map.put("speed", "fast");38 return BindingBuilder.bind(queue_header02()).to(headersExchange())39 .whereAll(map).match(); //以all方式匹配40 }41}
在消息生产者中编写方法:
1 //发送消息到headers交换机,同时指定k-v2 public void sendToHeader01(String msg) {3 log.info("发送消息==>{}",msg);4 //创建消息属性5 MessageProperties properties = new MessageProperties();6 properties.setHeader("color", "red");7 properties.setHeader("speed", "fast");8 //创建Message对象9 Message message = new Message(msg.getBytes(), properties);10 template.convertAndSend("headersExchange", "", message);11 }12
13 public void sendToHeader02(String msg) {14 log.info("发送消息==>{}",msg);15 //创建消息属性7 collapsed lines
16 MessageProperties properties = new MessageProperties();17 properties.setHeader("color", "red");18 properties.setHeader("speed", "normal");19 //创建Message对象20 Message message = new Message(msg.getBytes(), properties);21 template.convertAndSend("headersExchange", "", message);22 }
配置消费者:
1 /* 接收消息 */2 @RabbitListener(queues = "queue_header01")3 public void receive01(Message message) {4 log.info("[CONSUMER] received from queue_header01 --> " + new String(message.getBody()));5 }6
7 /* 接收消息 */8 @RabbitListener(queues = "queue_header02")9 public void receive02(Message message) {10 log.info("[CONSUMER] received from queue_header02 --> " + new String(message.getBody()));11 }
控制层:
1 @RequestMapping("/header01")2 @ResponseBody3 public void header01() {4 //让两个队列都收到消息5 mqSender.sendToHeader01("hello queue1 queue2");6 }7
8 @RequestMapping("/header02")9 @ResponseBody10 public void header02() {11 //只让第一个队列收到消息12 mqSender.sendToHeader02("hello queue1");13 }
2、广播模式---fanout
在广播模式下,即使生产者只生产了一条消息,它对应的所有消费者都能全部接收。就是把交换机里的消息发送给所有绑定了该交换机的队列,忽略routingKey
(路由)。
3、路由模式---Direct
路由模式是在使用交换机的同时,生产者指定路由发送数据, 消费者绑定路由接受数据。
与广播模式不同的是,广播模式只要是绑定了交换机的队列都会收到生产者向交换机推送过来的数据。而路由模式下加了一个路由设置,生产者向交换机发送数据时,会声明发送给交换机下的哪个路由,并且只有当消费者的队列绑定了交换机并且声明了路由,才会收到数据。
4、主题模式---Topic
direct 模式会造成路由 RoutingKey 太多, 而实际开发中往往是按照某个规则来进行路由匹配的, RabbitMQ 提供了 Topic 模式/主题模式来适应这种需求
Topic 模式是 direct 模式上的一种扩展/叠加, 扩展/叠加了模糊路由 RoutingKey 的模式, 可以理解为是模糊的路由匹配模式
- ’*’ :只能匹配一个单词
- ’#’ :可以匹配多个单词(或者零个)
1@Configuration2public class RabbitConfig {3 private static final String QUEUE01 = "queue_topic01";4 private static final String QUEUE02 = "queue_topic02";5 private static final String EXCHANGE = "topicExchange";6
7 private static final String ROUTING_KEY01 = "#.queue.#";8 private static final String ROUTING_KEY01 = "*.queue.#";9
10 @Bean11 public Queue queue_topic01() {12 return new Queue(QUEUE01);13 }14
15 @Bean20 collapsed lines
16 public Queue queue_topic02() {17 return new Queue(QUEUE02);18 }19
20 @Bean21 public TopicExchange exchange() {22 return new TopicExchange(EXCHANGE);23 }24
25 //绑定交换机同时指定路由26 @Bean27 public Binding binding_topic01() {28 return BindingBuilder.bind(queue_topic01()).to(topicExchange()).with(ROUTINGKEY01);29 }30
31 @Bean32 public Binding binding_topic02() {33 return BindingBuilder.bind(queue_topic02()).to(topicExchange()).with(ROUTINGKEY02);34 }35}
三、SpringBoot集成
添加maven依赖:
1<dependency>2 <groupId>org.springframework.boot</groupId>3 <artifactId>spring-boot-starter-amqp</artifactId>4</dependency>
application.yml
配置
1rabbitmq:2 host: localhost3 port: 56724 # 虚拟主机5 virtual-host: /6 password: guest7 username: guest8 listener:9 simple:10 # 消费者最小数量11 concurrency: 112 # 消费者最大数量13 max-concurrency: 1014 # 限制消费者每次只能处理一条消息,处理完才能处理下一条消息15 prefetch: 117 collapsed lines
16 # 启动时是否默认启动容器17 auto-startup: true18 # 消息被拒绝后,重新进入消息队列19 default-requeue-rejected: true20
21 template:22 retry:23 # 启用重试机制,默认false24 enabled: true25 # 最大重试次数26 max-attempts: 327 # 初始的重试时间间隔--第一次消息处理失败后重试时间间隔28 initial-interval: 1000ms29 # 最大时间间隔,默认10s30 max-interval: 10000ms31 # 重试时间间隔的乘数--上一次重试时间✖倍数32 multiplier: 1
配置类:
1import org.springframework.amqp.core.Queue;2import org.springframework.context.annotation.Bean;3import org.springframework.context.annotation.Configuration;4
5/**6 * @author 左齐亮7 * @version 1.08 */9@Configuration10public class RabbitMQConfig {11 /*定义队列名称*/12 private static final String QUEUE = "queue";13
14 /**15 * 配置队列45 collapsed lines
16 * durable: true 表示队列是否持久化,默认 true17 * 默认情况下默认放到内存,重启mq后就丢失了18 * 持久化后保存到 Erlang 自带的 Mnesia 数据库中,当 rabbitmq 重启之后会读取该数据库19 */20 @Bean21 public Queue queue() {22 return new Queue(QUEUE, true);23 }24
25
26 // ---fanout---27 private static final String QUEUE1 = "queue_fanout01";28 private static final String QUEUE2 = "queue_fanout02";29 private static final String EXCHANGE = "fanoutExchange";30
31 //创建队列32 @Bean33 public Queue queue1() {34 return new Queue(QUEUE1, true);35 }36
37 //创建队列38 @Bean39 public Queue queue2() {40 return new Queue(QUEUE2, true);41 }42
43 //创建交换机--FanoutExchange44 @Bean45 public FanoutExchange exchange(){46 return new FanoutExchange(EXCHANGE);47 }48
49 //进行绑定--QUEUE1绑定到交换机50 @Bean51 public Binding binding01() {52 return BindingBuilder.bind(queue1()).to(exchange());53 }54
55 //进行绑定--QUEUE2绑定到交换机56 @Bean57 public Binding binding02() {58 return BindingBuilder.bind(queue2()).to(exchange());59 }60}
如果没有将队列和交换机绑定就是使用默认的交换机——amqp.direct
【路由模式】
创建生产者:
1@Service2@Slf4j3public class MQSender {4 @Resource5 private RabbitTemplate template;6
7 /* 发送消息 */8 public void send(Object msg){9 log.info("[PRODUCER] start send message --> " + msg);10 template.convertAndSend("queue", msg);11 }12
13 //发送消息到指定交换机14 //注意由于是fanout模式,需要忽略路由,在convertAndSend时传入第二个是路由参数置空即可15 public void sendFanout(Object msg){5 collapsed lines
16 log.info("发送消息==>{}",msg);17 template.convertAndSend("fanoutExchange", "", msg);18 }19
20}
创建消费者:
1@Service2@Slf4j3public class MQConsumer {4
5 /* 接收消息 */6 @RabbitListener(queues = "queue")7 public void consume(Object msg) {8 log.info("[CONSUMER] received from queue --> " + msg);9 }10
11 //监听队列QUEUE1---接收消息12 @RabbitListener(queues = "queue_fanout01")13 public void receive1(Object msg) {14 log.info("从队列QUEUE1接收消息==>{}", msg);15 }7 collapsed lines
16
17 //监听队列QUEUE2---接收消息18 @RabbitListener(queues = "queue_fanout02")19 public void receive2(Object msg) {20 log.info("从队列QUEUE2接收消息==>{}", msg);21 }22}
创建Controller:
1@Controller2public class RabbitMQHandler {3 @Resource4 private MQSender mqSender;5
6 @RequestMapping("/mq")7 @ResponseBody8 public void mq(){9 String msg = "hello mq";10 mqSender.send(msg);11 }12
13 @RequestMapping("/mq/fanout")14 @ResponseBody15 public void fanout(){4 collapsed lines
16 String msg = "hello fanout";17 mqSender.sendFanout(msg);18 }19}