消息中间件(七)——RabbitMQ的工作模式(Publish/Subscribe)
RabbitMQ有以下几种工作模式 : 1、Work queues 2、Publish/Subscribe 3、Routing 4、Topics 5、Header 6、RPC
1、Work Queues模式
work queues与入门程序相比,多了一个消费端,两个消费端共同消费同一个队列中的消息。
应用场景:对于任务过重或任务较多情况使用工作队列可以提高任务处理的速度。
测试: 1、使用 入门程序,启动多个消费者。 2、生产者发送多个消息。
特点:
1、一条消息只会被一个消费者接收;
2、RabbitMQ采用轮询的方式将消息是平均发送给消费者的;
3、消费者在处理完某条消息后,才会收到下一条消息。
2、Publish/Subscribe模式
Publish/Subscribe即发布订阅模式,它有如下特点:
1、每个消费者监听自己的队列
2、生产者将消息发给broker,由交换机将消息转发到绑定此交换机的每个队列,每个绑定交换机的队列都将接收到消息
2.1 代码实现
代码还是编写在 入门程序的工程中
场景:当用户充值成功或转账完成系统通知用户,通知方式有短信、邮件多种方法 。
2.1.1 生产者代码
声明exchange_fanout_inform交换机
声明两个队列,并且绑定到此交换机,绑定时不需要指定routingkey
发送消息时不需要指定routingkey
package com.zdw.prodcure;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
/**
* Create By zdw on 2019/9/17
* 这是发布订阅模式Publish/Subscribe的生成者
*/
public class PubSubProdcure {
private static final String EXCHANGE_NAME="exchange_fanout_inform";//定义交换机名称
private static final String QUEUE_MSG="queue_msg_inform";//短信队列名称
private static final String QUEUE_EMAIL="queue_email_inform";//邮件队列名称
public static void main(String[] args) {
Connection connection = null;//定义连接对象
Channel channel = null;//定义通道对象
try {
//创建连接工厂对象
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setVirtualHost("/");//rabbitmq默认虚拟机名称为“/”,虚拟机相当于一个独立的mq服务
connection = connectionFactory.newConnection();//得到与RabbitMQ服务的TCP连接
channel = connection.createChannel();//从连接中得到与交换机的通道,一个连接可以创建多个通道,一个通道代表一个会话任务
/**
* 声明两个队列,String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
* 参数明细:
* 1、String queue:队列名称
* 2、boolean durable:是否持久化,如果持久化,mq重启后队列还在
* 3、exclusive :是否独占连接,队列只允许在该连接中访问,如果connection连接关闭队列则自动删除,如果将此参数设置true可用于临时队列的创建
* 4、autoDelete 自动删除,队列不再使用时是否自动删除此队列,如果将此参数和exclusive参数设置为true就可以实现临时队列(队列不用了就自动删除)
* 5、arguments 参数,可以设置一个队列的扩展参数,比如:可设置存活时间
*/
channel.queueDeclare(QUEUE_MSG,true,false,false,null);
channel.queueDeclare(QUEUE_EMAIL,true,false,false,null);
/**
* 声明交换机:
* 参数:String exchange, BuiltinExchangeType type
* 1、exchange:交换机名称
* 2、type:交换机类型,主要有以下几种类型:
* fanout:对应的是发布订阅模式
* direct:对应的是routing模式
* topic:对应的是ropic模式
* headers 对应的是headers工作模式
*/
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
/**
* 队列绑定到交换机
* 参数:String queue, String exchange, String routingKey
* 1、队列名称 2、交换机名称 3、路由key,作用是交换机根据路由key的值将消息转发到指定的队列中,在发布订阅模式中设置为空字符串
*/
channel.queueBind(QUEUE_MSG,EXCHANGE_NAME,"");
channel.queueBind(QUEUE_EMAIL,EXCHANGE_NAME,"");
/**
* 发送消息:String exchange, String routingKey, BasicProperties props, byte[] body
* 参数明细:
* 1、exchange 交换机 如果不指定将使用mq的默认交换机(设置为"")
* 2、routingKey 路由key 交换机根据路由key来将消息转发到指定的队列,如果使用默认交换机,routingKey设置为队列的名称
* 3、props 消息的属性
* 4、body 消息主体
* 这里没有指定交换机,消息将发送给默认交换机,每个队列也会绑定那个默认的交换机,但是不能显示绑定或解除绑定
* 默认的交换机,routingKey等于队列名称
*/
for(int i=0;i<5;i++) {
//定义发送的消息内容
String message = "send inform message to user:"+new SimpleDateFormat("yyyy-MM-dd HH:mm:ssSSS").format(new Date());
//------------------发送消息
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
System.out.println("Send message to MQ:" + message);
}
}catch (Exception e){
e.printStackTrace();
}finally {
if(channel!=null){
try {
channel.close();
} catch (Exception e) {
e.printStackTrace();
}
}
if(connection!=null){
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
代码中声明了一个交换机和两个队列,两个队列都绑定到了交换机上面,生产者发送消息到RabbitMQ。
运行上面的程序,可以通过浏览器访问: http://localhost:15672 查看详情:
可以看到多了一个交换机,交换机的类型是fanout。
可以看到又多了两个队列,同时每个队列中都有待消费的5条消息。
2.1.2 消费者代码
我们上面的生产者发送消息的时候,绑定了一个交换机两个队列,那么意味着就会有两个消费者,每个消费者绑定不同的队列,只消费改队列中的消息。
2.1.2.1 短信消费者
package com.zdw.consumer;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* Create By zdw on 2019/9/18
* 短信消费者
*/
public class PubSubConsumerMsg {
private static final String EXCHANGE_NAME="exchange_fanout_inform";//定义交换机名称,一定要与生产者中相同
private static final String QUEUE_MSG="queue_msg_inform";//短信队列名称,一定要与生产者中相同
public static void main(String[] args) {
Connection connection = null;//定义连接对象
Channel channel = null;//定义通道对象
try {
//创建连接工厂对象
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setVirtualHost("/");//rabbitmq默认虚拟机名称为“/”,虚拟机相当于一个独立的mq服务
connection = connectionFactory.newConnection();//获取与RabbitMQ的tcp连接
channel = connection.createChannel();//得到连接通道,一个连接可以创建多个通道,每个通道相当于一个会话任务
channel.queueDeclare(QUEUE_MSG,true,false,false,null);//声明队列
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);//声明交换机
channel.queueBind(QUEUE_MSG,EXCHANGE_NAME,"");//绑定队列到交换机
//定义消费消息的方法
DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
/**
* 消费者接收消息后执行该方法
* @param consumerTag 消费者标签,用来标识消费者的,在监听队列时(channel.basicConsume)设置
* @param envelope 信封,通过envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,
* 消息和重传标志(收到消息失败后是否需要重新发送)
* @param properties 消息属性
* @param body 消息内容
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String exchange = envelope.getExchange();
String routingKey = envelope.getRoutingKey();
long msgId = envelope.getDeliveryTag();//消息id
String msg = new String(body,"utf-8");
System.out.println(exchange+"//"+routingKey+"//"+msgId+"//"+msg);
}
};
//监听队列
/** 参数:String queue, boolean autoAck, Consumer callback
* 1、queue 队列名称
* 2、autoAck 自动回复,当消费者接收到消息后要告诉mq消息已接收,如果将此参数设置为tru表示会自动回复mq,如果设置为false要通过编程实现回复
* 3、callback,消费方法,当消费者接收到消息要执行的方法
*/
channel.basicConsume(QUEUE_MSG,true,defaultConsumer);
}catch (Exception e){
e.printStackTrace();
}
}
}
2.1.2.2 邮件消费者
package com.zdw.consumer;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* Create By zdw on 2019/9/18
* 邮件消费者
*/
public class PubSubConsumerEmail {
private static final String EXCHANGE_NAME="exchange_fanout_inform";//定义交换机名称,一定要与生产者中相同
private static final String QUEUE_EMAIL="queue_email_inform";//邮件队列名称,一定要与生产者中相同
public static void main(String[] args) {
Connection connection = null;//定义连接对象
Channel channel = null;//定义通道对象
try {
//创建连接工厂对象
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setVirtualHost("/");//rabbitmq默认虚拟机名称为“/”,虚拟机相当于一个独立的mq服务
connection = connectionFactory.newConnection();//获取与RabbitMQ的tcp连接
channel = connection.createChannel();//得到连接通道,一个连接可以创建多个通道,每个通道相当于一个会话任务
channel.queueDeclare(QUEUE_EMAIL,true,false,false,null);//声明队列
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);//声明交换机
channel.queueBind(QUEUE_EMAIL,EXCHANGE_NAME,"");//绑定队列到交换机
//定义消费消息的方法
DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
/**
* 消费者接收消息后执行该方法
* @param consumerTag 消费者标签,用来标识消费者的,在监听队列时(channel.basicConsume)设置
* @param envelope 信封,通过envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,
* 消息和重传标志(收到消息失败后是否需要重新发送)
* @param properties 消息属性
* @param body 消息内容
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String exchange = envelope.getExchange();
String routingKey = envelope.getRoutingKey();
long msgId = envelope.getDeliveryTag();//消息id
String msg = new String(body,"utf-8");
System.out.println(exchange+"//"+routingKey+"//"+msgId+"//"+msg);
}
};
//监听队列
/** 参数:String queue, boolean autoAck, Consumer callback
* 1、queue 队列名称
* 2、autoAck 自动回复,当消费者接收到消息后要告诉mq消息已接收,如果将此参数设置为tru表示会自动回复mq,如果设置为false要通过编程实现回复
* 3、callback,消费方法,当消费者接收到消息要执行的方法
*/
channel.basicConsume(QUEUE_EMAIL,true,defaultConsumer);
}catch (Exception e){
e.printStackTrace();
}
}
}
运行了两个消费者之后,我们可以看到控制台打印的消息,并且也可以在后台管理页面看到,队列当中的消息已经被消费了。
Publish/Subscribe工作模式的特点:生产者发送若干条消息,每条消息都转发到各个队列,每个消费者都接收到了消息。
2.2 publish/subscribe与work queues比较
2.2.1 区别
1)work queues不用定义交换机,而publish/subscribe需要定义交换机。
2)publish/subscribe的生产方是面向交换机发送消息,work queues的生产方是面向队列发送消息(底层使用默认交换机)。
3)publish/subscribe需要设置队列和交换机的绑定,work queues不需要设置,实质上work queues会将队列绑定到默认的交换机 。
2.2.2 共同点
两者实现的发布/订阅的效果是一样的,多个消费端监听同一个队列不会重复消费消息。
代码下载: https://download.csdn.net/download/zengdongwen/11784368
mango_233: 超棒!完美解决我的问题
Qiu-Forever: 你好 博主 消费者不重复消费 存在redis 如果redis宕机了 怎么办呢?
白马非马·: 您好,这个项目包能发一份吗,非常感谢,2500498086@qq.com
白马非马·: 您好,这个项目包能发一份吗,非常感谢,2500498086@qq.com
holewritle: 苦sile