一篇文章带你从入门到精通:RabbitMQ(rabbitmq ui)快来看

随心笔谈11个月前发布 admin
73 0



# 设置用户分配操作权限,admin 用户的权限为 administrator
rabbitmqctl set_user_tags admin administrator

5.为用户添加资源权限

因为 admin 已经是超级管理员权限了,所以其实不分配资源权限也可以,会默认去做。

# 命令格式为: set_permissions [-p <vhostpath>] <user> <conf> <write> <read>
# 这里即为 admin 用户开启 配置文件和读写的权限
rabbitmqctl set_permissions -p / admin “.*””.*””.*”

6.访问 Linux IP:15672 ,例如,输入刚才设置好的用户名密码 admin

如图:访问成功

2.1.2.1 命令小结

1.添加用户:

2.修改密码:

3.删除用户:

4.用户列表:

5.设置用户角色:

6.删除用户所有角色:

7.为用户添加资源权限:

使用:输入 rabbitmqctl ,则会提示可能使用的命令,然后 使用 rabbitmqctl hepl <命令> 可以查看具体命令的使用方法和参数。

2.1.3 简单介绍 Web 界面管理

Connections(连接):此处用来管理与
RabbitMQ 建立连接后的生产者和消费者
Channels(通道):连接建立后,会形成通道,消息的投递获取依赖通道。
Exchanges(交换机):用来实现消息的路由
Queues(队列):存放消息的队列,消息等待被消费,消费后被移除队列。
Admin(管理):用于对管理用户,以及对应权限进行设置,如下图所示

Tags 就是用来指定用户的角色

administrator(超级管理员):登录控制台、查看所有信息、操作用户、操作策略
monitoring(监控者): 登录控制台、查看所有信息
policymaker(策略制定者): 登录控制台、指定策略
managment(普通管理员): 登录控制台

在 Docker 中安装 RabbitMQ 不需要自己去考虑版本,环境等的各种冲突不兼容问题,是非常便捷的,我演示的这台虚拟机是一个 CentOS 7.9 裸机,所以我们从更新 yum,到安装 Docker 和 安装 RabbitMQ 按步骤都讲一下

2.2.1 配置 yum

1.更新 yum 到最新版

# 更新 yum
yum update
# 检查yum依赖的几个包 yum-utils 提供 yum-config-manager 功能, 后面两个是 devicemapper 用到的
yum install -y yum-utils device-mapper-persistent-data lvm2

2.设置 yum 源为阿里云

yum-config-manager –add-repo http://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo

2.2.2 安装 docker
2.2.2.1 步骤

1.使用 yum 安装 docker

docker-ce 是社区版的意思,ee为企业版

yum install docker-ce -y

2.通过查看版本,检查安装是否成功

docker -v

sudo mkdir -p /etc/docker
sudo tee /etc/docker/daemon.json <<-‘EOF’
{
“registry-mirrors”: [“https://<你的ID>.mirror.aliyuncs.com”]
}
EOF
sudo systemctl daemon-reload
sudo systemctl restart docker

国内从 DockerHub 拉取镜像有时会遇到困难,此时可以配置镜像加速器。Docker 官方和国内很多云服务商都提供了国内加速器服务,例如:

科大镜像:https://docker.mirrors.ustc.edu.cn/
网易:https://hub-mirror.c.163.com/
阿里云:https://<你的ID>.mirror.aliyuncs.com
七牛云加速器:https://reg-mirror.qiniu.com

当配置某一个加速器地址之后,若发现拉取不到镜像,请切换到另一个加速器地址。国内各大云服务商均提供了 Docker 镜像加速服务,建议根据运行 Docker 的云平台选择对应的镜像加速服务。

阿里云镜像获取地址:https://cr.console.aliyun.com/cn-hangzhou/instances/mirrors,登陆后,左侧菜单选中镜像加速器就可以看到你的专属地址了

2.2.2.2 Docker 常见命令
2.2.2.2.1 管理命令

就启动,停止,重启这些简单的命令使用 service 也是可以的,systemctl 功能稍微强大一些

# 启动 docker
systemctl docker start
# 停止 docker
systemctl docker stop
# 重启 docker
systemctl docker restart
# 查看 docker 状态
systemctl status docker
# 开机自启
systemctl enable docker
systemctl unenable docker

2.2.2.2.2 镜像命令

# 导入镜像文件
docker load < xxx.tar.gz
# 查看安装的镜像
docker images
# 删除镜像
docker rmi 镜像名

2.2.3 安装 RabbitMQ (任选其一)

注:直接用 2.2.3.2 一句话安装 会更好一些

2.2.3.1 一步一步安装获取

1.RabbitMQ 的镜像

docker pull rabbitmq:management

2.创建并运行容器(具体参数在 3 中介绍)

docker run -id –name 容器名 -p 15672:15672 -p 5672:5672 rabbitmq:management

2.2.3.2 一句话安装

上面的安装方式,就是先获取到 RabbitMQ 镜像后再开始安装,这里是没有问题的,创建时会有一个问题,因为我们要安装 management 也就是它的 web 管理,如果不做一些处理,默认装好的是没有用户的,所以还需要像前面一样自己进去配置,而 Docker Hub 已经给出了我们配置的示例,即使用代表配置,使 和 配置用户名和密码

更多请查看 Docker Hub 官方给予例子中的 Setting default user and password 章节https://registry.hub.docker.com/_/rabbitmq/

1.执行安装

docker run -di –name myrabbitmq -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -p 15672:15672 -p
5672:5672 -p 25672:25672 -p 61613:61613 -p 1883:1883 rabbitmq:management

2.通过容器状态,查看是否运行成功

# 查看容器运行状态docker ps -a# 启动docker start 容器名# 停止docker stop 容器名# 退出命令行,不停止exit# 进入到node容器(如果开启了 -t 的情况)docker exec -it 容器名 bash

2.2.3.2.1 参数介绍

下面分别讲解一下这些参数的说明:

:表示运行容器。
:表示为容器保留交互的方式(命令行),即分配一个伪终端。所以常常会见到 这样的搭配。
:为容器起个名字。
:表示目录映射关系(前者是宿主机目录,后者是映射到宿主机上的目录),可以使用多个 做多个目录或文件映射。注意:推荐做目录映射,在宿主机上做修改,然后共享到容器上。
:表示创建一个守护式容器在后台运行(这样创建容器后不会自动登录容器,如果只加 两个参数,创建后就会自动进去容器),即后端挂起运行。
:表示端口映射,前者是宿主机端口,后者是容器内的映射端口。可以使用多个 做多个端口映射,只有做了端口映射,才能被外界访问。

给大家举个例子:

# 查看容器运行状态
docker ps -a
# 启动
docker start 容器名
# 停止
docker stop 容器名
# 退出命令行,不停止
exit
# 进入到node容器(如果开启了 -t 的情况)
docker exec -it 容器名 bash

因为使用了 -t 这个参数,所以可以分配到一个伪终端,通过 docker exec -it 容器名 bash 进入命令行

-v 目录映射后,进入容器后,也会有一个一模一样的 demo 文件夹,例如在其中可以执行 python 程序
2.2.3.2.1 端口介绍

4369 :erlang发现端口

5672:client端通信端口

15672:管理界面ui端口

25672:server间内部通信端口

61613:不带TLS和带TLS的STOMP客户端

1883:不启用和启用TLS的MQTT客户端

比较关键的就是 5672 和 15672

更多端口详情可以访问官网文档https://www.rabbitmq.com/networking.html

注:如果要通过远程连接,例如访问 web 管理页面的 15672 端口,Java 客户端连接的 5672 端口, 一定要进行一个开放操作,否则都连接不到。

以下为基于 CentOS 7.9 开放 15672 端口的例子

# 查询 15672 是否开放,一般默认都是 no
firewall-cmd –query-port=15672/tcp
# 开放指定端口 15672
firewall-cmd –add-port=15672/tcp –permanent
# 重新载入
firewall-cmd –reload
# 再次查询,结果就是 yes 了
firewall-cmd –query-port=15672/tcp

]以下是关闭防火墙的命令

systemctl disable firewalld
systemctl stop firewalld

安装结束后,就要进入主题,即用 Java 或者 Springboot 代码来实现 RabbitMQ的几种方式,但是想要很好的理解这几种路由交换方式,就需要对它的协议和架构模型有所了解。

3.1.1 什么是协议?

协议,网络协议的简称,网络协议是通信计算机双方必须共同遵从的一组约定。如怎么样建立连接、怎么样互相识别等。只有遵守这个约定,计算机之间才能相互通信交流。它的三要素是:语法、语义、时序。

为了使数据在网络上从源到达目的,网络通信的参与方必须遵循相同的规则,这套规则称为协议(protocol),它最终体现为在网络上传输的数据包的格式。

3.1.1.1 网络协议的三要素

1.语法:数据与控制信息的结构和格式,以及数据出现的顺序。

2.语义:解释控制信息每个部分的意义,以及规定了需要发出何种控制信息以及完成的动作做出何种响应。

3.时序:对事件发生顺序的详细说明。

人们形象地把这三个要素描述为:做什么,怎么做,做的顺序。

举个例子 HTTP 协议

语法:HTTP 规定了请求报文和响应报文的格式

语义:客户端主动发起请求称为请求,服务端随之返回数据,称为响应

时序: 一个请求对应一个响应,而且先有请求后有响应

3.1.1.1.1 面试题:为什么消息中间件不直接使用 HTTP 协议

对于一个消息中间件来说,其主要责任就是负责数据传递,存储,分发,高性能和简洁才是我们所追求的,而 HTTP 请求报文头和响应报文头是比较复杂的,包含了Cookie,数据的加密解密,窗台吗,响应码等附加的功能,我们并不需要这么复杂的功能。

同时大部分情况下 HTTP 大部分都是短链接,在实际的交互过程中,一个请求到响应都很有可能会中断,中断以后就不会执行持久化,就会造成请求的丢失。这样就不利于消息中间件的业务场景,因为消息中间件可能是一个长期的获取信息的过程,出现问题和故障要对数据或消息执行持久化等,目的是为了保证消息和数据的高可靠和稳健的运行

3.1.2 RabbitMQ 的 AMQP 协议

RabbitMQ 的使用的协议是 AMQP(advanced message queuing protocol),它在2003年时被提出,最早用于解决金融领不同平台之间的消息传递交互问题。

AMQP 更准确的说是一种 binary wire-level protocol(链接协议)。这是其和 JMS 的本质差别,AMQP 不从 API 层进行限定,而是直接定义网络交换的数据格式。这使得实现了AMQP的 Provider(Producer) 天然性就是跨平台的。

相比较其它消息协议,其特性为:

1.分布式事务支持

2.消息的持久化支持

3.高性能和高可靠的消息处理优势

3.1.3 架构模型

想要学习后面的几种消息具体的发送模式,这个模型图就必须理解清楚,因为这几种方式就是对这个模型不同程度的选择和缩减

:消息的生产者(发送消息的程序)。
:应用程序与Broker之间的网络连接。
:信道,即信息传输的通道,可以建立多个 Channel,每个 Channel 代表一个会话任务。

信道是建立在 TCP 连接内的虚拟连接,信息的读写都通过信道传输,因为对于操纵系统而言,建立和销毁 TCP 是非常昂贵的,所以引入了信道的概念,以复用一条 TCP 连接。

:标识消息队列服务器实体,例如这里就是 RabbitMQ Server。
:虚拟主机,一个 Broker 中可以设置多个 Virtual Host,用作不同用户的权限隔离。

Broker 可以理解为整个数据库服务,而 Virtual Host 就是其中每个数据库的感觉,不同项目可以对应不同的数据库,其中有着项目所属的业务表等等。
每个 Virtual Host 中,可以有若干个 Exchange 和 Queue。

:交换机,用来接收生产者发送的消息,然后将这些消息根据路由键发送到队列。
:Exchange 和 Queue 之间的虚拟连接,Binding 中可以包括多个 Routing key。
:路由规则,虚拟机用它来确认如何路由一个特定消息。
:消息队列,它是消息的容器,用来保存消息,每一条消息都能传入一个或者多个队列中,等待消费者消费,即取出这个消息。
:消息的消费者(接收消息的程序)。

官网介绍几种模型:https://www.rabbitmq.com/getstarted.html

截止目前为止,官网一共提供了 7 中模型的介绍,我们主要介绍前五种基本的模式,也有人将 Direct 和 Topic模式都归入 Routing 模式,也可以看做四大种。

4.1.1 创建 Java 项目

首先创建好一个不使用骨架的 Maven 项目,然后引入 RabbitMQ 依赖,还有单元测试依赖即可

<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.10.0</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
</dependency>

4.1.2 创建虚拟主机(可选)

在这里,我们创建了一个新的 Virtual Hosts,用来为这个Java项目服务,大家还可以创建一个新的用户,然后对其开启这个 Virtual Hosts 的访问权限(即将虚拟主机与用户绑定)。我们这里还是用 admin(我之前创建的一个管理员权限用户) 来演示。

注:这部分不去做也可以,直接用 / 和 admin 用户也行

4.1.3 创建连接工具类

由于我们后面要演示多种例子,而每一次获取连接和释放连接、关闭资源等操作代码都是一致的,为了防止代码冗余,优化代码,更易理解,提取出一个工具类,这样大家将重心放在不同实现方式的对比上就行了。

RabbitMqUtil 工具类

public class RabbitMqUtil {

private static String host=””;

private static int port=0;

private static String virtualHost=””;

private static String username=””;

private static String password=””;
// 使用静态代码块为Properties对象赋值
static {
try {
//实例化对象
Properties properties=new Properties();
//获取properties文件的流对象
InputStream in=RabbitMqUtil.class.getClassLoader().getResourceAsStream(“rabbitmq.properties”);
properties.load(in);
// 分别获取 value
host=properties.getProperty(“host”);
port=Integer.parseInt(properties.getProperty(“port”));
virtualHost=properties.getProperty(“virtualHost”);
username=properties.getProperty(“username”);
password=properties.getProperty(“password”);
} catch (Exception e) {
e.printStackTrace();
}
}

public static Connection getConnection() {
try {
// 创建连接工厂
ConnectionFactory connectionFactory=new ConnectionFactory();
// 设置连接 rabbitmq 主机
connectionFactory.setHost(host);
// 设置端口号
connectionFactory.setPort(port);
// 设置连接的虚拟主机(数据库的感觉)
connectionFactory.setVirtualHost(virtualHost);
// 设置访问虚拟主机的用户名和密码
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
// 返回一个新连接
return connectionFactory.newConnection();
} catch (Exception e) {
e.printStackTrace();
}
return null;
}

public static void close(Channel channel, Connection connection) {
try {
if (channel !=null) {
channel.close();
}
if (connection !=null) {
connection.close();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}

properties

host=192.168.122.1
port=5672
virtualHost=/rabbitmq_maven_01
username=admin
password=adminv

说明:

队列名,消息等等字符串内容,更推荐定义成变量传入,我文中都是直接写在参数中的,这种魔法值的写法,并不是很优美。
生产者中使用了 Junit 单元测试,但是消费者中却在 main 函数中编写,这是因为,我们希望消费者处于一个持续运行等待的状态,如果使用 Junit 会导致,程序在执行一次后结束掉.

除了在 main 函数中编写,还可以考虑使用 sleep 等待或者 while(true) 让程序不要直接终止掉。

4.2.1 简单队列模式(Hello Word)

:消息的生产者(发送消息的程序)。
:消息队列,理解为一个容器,生产者向它发送消息,它把消息存储,等待消费者消费。
:消息的消费者(接收消息的程序)。

4.2.1.1 如何理解

由图所示,简单队列模式,一个生产者,经过一个队列,对应一个消费者。可以看做是点对点的一种传输方式,相较与 3.1.3 中的模型图,最主要的特点就是看不到 Exchange(交换机) 和 routekey(路由键) ,正是因为这种模式简单,所以并不会涉及到复杂的条件分发等等,因此也不需要用户去显式的考虑交换机和路由键的问题。

但是要注意,这种模式并不是生产者直接对接队列,而是用了默认的交换机,默认的交换机会把消息发送到和 routekey 名称相同的队列中去,这也是我们在后面代码中在 routekey 位置填写了队列名称的原因

4.2.1.2 代码实现
4.2.1.2.1 生产者代码

public class Producer {
@Test
public void sendMessage() throws IOException, TimeoutException {
// 通过工具类获取连接
Connection connection=RabbitMqUtil.getConnection();
// 获取连接通道
Channel channel=connection.createChannel();
// 通道绑定消息队列
channel.queueDeclare(“queue1”,false,false,false,null);
// 发布消息
channel.basicPublish(“”,”queue1″,null,”This is rabbitmq message 001 !”.getBytes());
// 通过工具关闭channel和释放连接
RabbitMqUtil.close(channel,connection);
}
}public class Producer {
@Test
public void sendMessage() throws IOException, TimeoutException {
// 通过工具类获取连接
Connection connection=RabbitMqUtil.getConnection();
// 获取连接通道
Channel channel=connection.createChannel();
// 通道绑定消息队列
channel.queueDeclare(“queue1”,false,false,false,null);
// 发布消息
channel.basicPublish(“”,”queue1″,null,”This is rabbitmq message 001 !”.getBytes());
// 通过工具关闭channel和释放连接
RabbitMqUtil.close(channel,connection);
}
}

1.通过工具类获取连接

2.获取连接通道:根据 3.1.3 的模型图可知,生产者需要在获取到连接后,再获取信道,才能去访问后面的交换机队列等。

3.通道绑定消息队列:绑定队列前,应该绑定交换机,但是此模式中隐蔽了交换机的概念,背后使用了默认的交换机,所以直接绑定队列。

queueDeclare 方法解释

参数1:queue(队列名称),如果队列不存在,则自动创建。
参数2:durable(队列是否持久化),持久化可以保证服务器重启后此队列仍然存在。
参数3:exclusive(排他队列)即是否独占队列,如果此项为 true,该队列仅对首次申明它的连接可见,并在连接断开时自动删除。
参数4:autoDelete(自动删除),最后一个消费者将消息消费完毕后,自动删除队列。
参数5:arguments(携带附加属性)。

4.发布消息:此处可以指定消息队列的发送方法,以及内容等,因为此模式比较简单,所以没有涉及到全部参数,后面的模式会有详细的讲解

basicPublish 方法解释

参数1:exchange(交换机名称)。
参数2:routingKey(路由key),此处填写队列名,可理解为把消息发送到和 routekey 名称相同的队列中去。
参数3:props(消息的控制状态),可以在此处控制消息的持久化。参数为:MessageProperties.PERSISTENT_TEXT_PLAIN参数4:body(消息主体),类型是一个字节数组,要转一下类型。

5.通过工具关闭channel和释放连接:先关闭通道,再释放连接。

4.2.1.2.2 消费者代码

public class Consumer {
public static void main(String[] args) throws IOException, TimeoutException{
// 通过工具类获取连接
Connection connection=RabbitMqUtil.getConnection();
// 获取连接通道
Channel channel=connection.createChannel();
// 通道绑定消息队列
channel.queueDeclare(“queue1”, false, false, false, null);
// 消费消息
channel.basicConsume(“queue1”, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(“new String(body): ” + new String(body));
}
});
}
}

1.通过工具类获取连接

2.获取连接通道

3.通道绑定消息队列

4.消费消息:此处用来指定消费哪个队列的消息,以及一些机制和回调

basicConsume 方法解释

参数1:queue(队列名称),即消费哪个队列的消息 。
参数2:autoAck(自动应答)开始消息的自动确认机制,只要消费了就从队列删除消息。
参数3:callback(消费时的回调接口),callback 的类型是 Consumer 这里使用了 DefaultConsumer 就是 Consumer 的一个实现类。其中重写 handleDelivery 方法,就可以获取到消费的数据内容了,这里主要使用了其中的 body,即查看消息主体,其他三个参数暂时还没用到,有兴趣可以先打印输出一下,能先有个大概的了解。4.2.2 工作队列模式(Work Queue)

:消息的生产者(发送消息的程序)。
:消息队列,理解为一个容器,生产者向它发送消息,它把消息存储,等待消费者消费。
Consumer:消息的消费者(接收消息的程序)。

此处我们假设 Consumer1、Consumer2、Consumer3 分别为完成任务速度不一样快的消费者,这会引出此模式的一个重点问题。

4.2.2.1 如何理解

工作模式由图可以看出,就是在简单队列模式的基础上,增加了多个消费者,也就是让多个消费者绑定同一个队列,共同去消费,这样能解决简单队列模式中,如果生产速速远大于消费速度,而导致的消息堆积现象。

因为消息被消费后就会消失,所以不必担心任务会重复执行。

4.2.2.2 代码实现

注:工作队列模式有两种

轮询模式:每个消费者均分消息公平分发模式(能者多劳):按能力分发,处理速度快的分发的多,处理速度慢的分发的少

我们首先演示的是轮询模式,根据它的缺点,又能引出公平分发模式

下面只描述与上面有差异的部分,在简单模式中,这些基本的方法都有介绍过

4.2.2.2.1 轮询模式-生产者代码

public class Producer {
@Test
public void sendMessage() throws IOException, TimeoutException {
// 通过工具类获取连接
Connection connection=RabbitMqUtil.getConnection();
// 获取连接通道
Channel channel=connection.createChannel();
// 通道绑定消息队列
channel.queueDeclare(“work”, true, false, false, null);
for (int i=1; i <=20; i++) {
// 发布消息
channel.basicPublish(“”, “work”, null, (i + “号消息”).getBytes());
}
// 通过工具关闭channel和释放连接
RabbitMqUtil.close(channel, connection);
}
}

流程和简单队列模式基本一致,有一些小小的改动,生产者中主要就是加了层循环,因为有多个消费者,所以多发送一些消息,可以看出一些特点和问题。

4.2.2.2.2 轮询模式-消费者代码

消费者 1

public class Consumer1 {
public static void main(String[] args) throws IOException {
// 通过工具类获取连接
Connection connection=RabbitMqUtil.getConnection();
// 获取连接通道
final Channel channel=connection.createChannel();
// 通道绑定消息队列
channel.queueDeclare(“work”, true, false, false, null);
// 消费消息
channel.basicConsume(“work”, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(“消费者1号:消费-” + new String(body));
}
});
}
}

消费者 2

public class Consumer2 {
public static void main(String[] args) throws IOException {
// 通过工具类获取连接
Connection connection=RabbitMqUtil.getConnection();
// 获取连接通道
final Channel channel=connection.createChannel();
// 通道绑定消息队列
channel.queueDeclare(“work”, true, false, false, null);
// 消费消息
channel.basicConsume(“work”, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(“消费者2号:消费-” + new String(body));
}
});
}

上述两个消费者都在 basicConsume中开启了自动 Ack 应答,这一点下面会详述,同时在消费者 1 中,增加了 sleep 2s 的语句,模拟消费者1处理消息速度慢,而消费者2处理消息速度快的场景。

运行结果:

Consumer1

消费者1号:消费-1号消息

消费者1号:消费-3号消息

消费者1号:消费-5号消息

消费者1号:消费-7号消息

消费者1号:消费-9号消息

消费者1号:消费-11号消息

消费者1号:消费-13号消息

消费者1号:消费-15号消息

消费者1号:消费-17号消息

消费者1号:消费-19号消息

Consumer2

消费者2号:消费-2号消息

消费者2号:消费-4号消息

消费者2号:消费-6号消息

消费者2号:消费-8号消息

消费者2号:消费-10号消息

消费者2号:消费-12号消息

消费者2号:消费-14号消息

消费者2号:消费-16号消息

消费者2号:消费-18号消息

消费者2号:消费-20号消息

观察执行过程:发现两个消费者虽然每个人最后都各自处理了一半的消息,而且是按照一人一条分配的,但是消费者2号处理速度快,一下子就全部处理完了,但是消费者1号,每一次处理都需要 2s 所以,只能缓慢的处理,而消费者2号就处于一个空闲浪费的情况了。

如何切换为公平分发模式呢?

这就和 basicConsume 中的第二个参数,开启自动确认消费有关了,它默认是 true,也就代表只要一旦拿到队列中分发给这个消费者的消息,我就会自动返回一个确认消费的标识,队列收到后就会自动删除掉队列中的消息。

但是这其中有一个很重要的问题,这种方式就是将风险交给了消费者,例如消费者收到了自己需要处理的 10 条消息,刚消费了 4 个,消费者宕机,挂掉了,后面的 6 个消息就丢失了。

如果想要修改为按能力分配的方式,有两个要点

1.设置通道一次只能消费一个消息

2.关闭消息的自动确认,手动确认消息

4.2.2.2.3 公平分发模式-生产者代码

public class Producer {
@Test
public void sendMessage() throws IOException, TimeoutException {
// 通过工具类获取连接
Connection connection=RabbitMqUtil.getConnection();
// 获取连接通道
Channel channel=connection.createChannel();
// 一次只发送一条消息
channel.basicQos(1);
// 通道绑定消息队列
channel.queueDeclare(“work”, true, false, false, null);
for (int i=1; i <=20; i++) {
// 发布消息
channel.basicPublish(“”, “work”, null, (i + “号消息”).getBytes());
}
// 通过工具关闭channel和释放连接
RabbitMqUtil.close(channel, connection);
}

4.2.2.2.4 公平分发模式-消费者代码

消费者1

public class Consumer1 {
public static void main(String[] args) throws IOException {
// 通过工具类获取连接
Connection connection=RabbitMqUtil.getConnection();
// 获取连接通道
final Channel channel=connection.createChannel();
// 一次只接受一条未确认的消息
channel.basicQos(1);
// 通道绑定消息队列
channel.queueDeclare(“work”, true, false, false, null);
// 消费消息
channel.basicConsume(“work”, false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(“消费者1号:消费-” + new String(body));
// 返回 deliveryTag 代表队列可以删除此消息了
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
}
}

消费者2

public class Consumer2 {
public static void main(String[] args) throws IOException {
// 通过工具类获取连接
Connection connection=RabbitMqUtil.getConnection();
// 获取连接通道
final Channel channel=connection.createChannel();
//步骤一:一次只接受一条未确认的消息
channel.basicQos(1);
// 通道绑定消息队列
channel.queueDeclare(“work”, true, false, false, null);
// 消费消息
channel.basicConsume(“work”, false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(“消费者2号:消费-” + new String(body));
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
}
public class Consumer2 {
public static void main(String[] args) throws IOException {
// 通过工具类获取连接
Connection connection=RabbitMqUtil.getConnection();
// 获取连接通道
final Channel channel=connection.createChannel();
//步骤一:一次只接受一条未确认的消息
channel.basicQos(1);
// 通道绑定消息队列
channel.queueDeclare(“work”, true, false, false, null);
// 消费消息
channel.basicConsume(“work”, false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(“消费者2号:消费-” + new String(body));
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
}

运行结果:

Consumer1

消费者1号:消费-1号消息

Consumer2

消费者2号:消费-2号消息

消费者2号:消费-3号消息

消费者2号:消费-4号消息

消费者2号:消费-5号消息

消费者2号:消费-6号消息

消费者2号:消费-7号消息

消费者2号:消费-8号消息

消费者2号:消费-9号消息

消费者2号:消费-10号消息

消费者2号:消费-11号消息

消费者2号:消费-12号消息

消费者2号:消费-13号消息

消费者2号:消费-14号消息

消费者2号:消费-15号消息

消费者2号:消费-16号消息

消费者2号:消费-17号消息

消费者2号:消费-18号消息

消费者2号:消费-19号消息

消费者2号:消费-20号消息

4.2.3 发布与订阅模式(Fanout 广播)

:消息的生产者(发送消息的程序)。
:交换机,负责发送消息给指定队列。
:消息队列,理解为一个容器,生产者向它发送消息,它把消息存储,等待消费者消费。
:消息的消费者(接收消息的程序)。

4.2.3.1 如何理解

Fanout 直译为 “扇出” 但是大家更多的会把它叫做广播或者发布与订阅,它是一种没有路由key的模式,生产者将消息发送给交换机,交换机会把所有消息复制同步到所有与它绑定过的队列上,而每个队列只能有一个消费者拿到这条消息,如果在一个消费者连接中,创建多个通道,则会出现争抢消息的结果。

4.2.3.2 代码实现

注:下面只描述与上面有差异的部分,在简单模式中,这些基本的方法都有介绍过

4.2.3.2.1 生产者代码

public class Producer {
@Test
public void sendMessage() throws IOException, TimeoutException {
// 通过工具类获取连接
Connection connection=RabbitMqUtil.getConnection();
// 获取连接通道
final Channel channel=connection.createChannel();
// 声明交换机
channel.exchangeDeclare(“order”, “fanout”);
for (int i=1; i <=20; i++) {
// 发布消息
channel.basicPublish(“order”, “”, null, “fanout!”.getBytes());
}
// 通过工具关闭channel和释放连接
RabbitMqUtil.close(channel, connection);
}
}

1.声明交换机

exchangeDeclare 方法解释

参数1:exchange(交换机名称),如果交换机不存在,则自动创建
参数2:type(类型),此处选择 fanout 模式

2.发布消息:在 basicPublish 方法的第一个参数中输入上述定义好的交换机的名字,第二个参数,路由键为空

循环 20 条是为了演示消费者

4.2.3.2.2 消费者代码

public class Consumer1 {
public static void main(String[] args) throws IOException {
// 通过工具类获取连接
Connection connection=RabbitMqUtil.getConnection();
Channel channel=connection.createChannel();
// 声明交换机
channel.exchangeDeclare(“order”, “fanout”);
// 创建临时队列
String queue=channel.queueDeclare().getQueue();
// 绑定临时队列和交换机
channel.queueBind(queue, “order”, “”);
// 消费消息
channel.basicConsume(queue, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(“消费者1号:消费-” + new String(body));
}
});
}
}

1.声明交换机

2.创建临时队列

3..绑定临时队列和交换机

queueBind 方法解释

参数1:queue(临时队列)
参数2:exchange(交换机)
参数3:routingKey(路由key)

消费者2:演示了一个连接中,多个通道的情况

public class Consumer2 {
public static void main(String[] args) throws IOException {
// 通过工具类获取连接
Connection connection=RabbitMqUtil.getConnection();
// 获取连接通道
Channel channel=connection.createChannel();
Channel channel2=connection.createChannel();
// 声明交换机
channel.exchangeDeclare(“order”, “fanout”);
channel2.exchangeDeclare(“order”, “fanout”);
// 创建临时队列
String queue=channel.queueDeclare().getQueue();
System.out.println(queue);
// 绑定临时队列和交换机
channel.queueBind(queue, “order”, “”);
channel2.queueBind(queue, “order”, “”);
// 消费消息
channel.basicConsume(queue, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(“消费者2号:消费-” + new String(body));
}
});
// 消费消息
channel2.basicConsume(queue, true, new DefaultConsumer(channel2) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(“消费者2-2号:消费-” + new String(body));
}
});
}
}

运行结果:

消费者2号:消费-2号消息

消费者2号:消费-3号消息

消费者2号:消费-4号消息

消费者2号:消费-5号消息

消费者2号:消费-6号消息

消费者2号:消费-7号消息

消费者2号:消费-8号消息

消费者2号:消费-9号消息

消费者2号:消费-10号消息

消费者2号:消费-11号消息

消费者2号:消费-12号消息

消费者2号:消费-13号消息

消费者2号:消费-14号消息

消费者2号:消费-15号消息

消费者2号:消费-16号消息

消费者2号:消费-17号消息

消费者2号:消费-18号消息

消费者2号:消费-19号消息

消费者2号:消费-20号消息

4.2.3.2.3 为什么消费者中也声明交换机?

从上面的代码中可以看出,在 Producer 和 Conusmer 中我们都分别声明了交换机,但是消费者由图可知,并不会与交换机有直接的接触,为什么消费者中也声明交换机呢?

这是为了保证 Producer 或者 Producer 执行的时候,永远不会因为交换机还没被声明而出错,例如你只在 Producer 声明了交换机,那么你就必须先启动 Producer ,如果直接执行 Conusmer 此时交换机就还不存在,就会报错。而全部写入声明,则可以保证不论先启动谁,都会声明到交换机。

4.2.4 路由模式( Routing / Direct)

:消息的生产者(发送消息的程序)。
:交换机,负责发送消息给指定队列。
:路由key,即上图的 key1,key2 等,相当于在交换机和队列之间又加了一层限制
:消息队列,理解为一个容器,生产者向它发送消息,它把消息存储,等待消费者消费。
:消息的消费者(接收消息的程序)。

4.2.4.1 如何理解

路由模式的交换机类型是 direct,与 fanout 模式相比,多了路由 key 这个概念。生产者发送携带指定 routingKey(路由key) 的消息到交换机,交换机拿着此 routingKey 去找到绑定了这个 routingKey 的队列,然后发送到此队列,一个队列可以绑定多个 routingKey 。

4.2.4.2 代码实现
4.2.4.2.1 生产者代码

public class Producer {
@Test
public void sendMessage() throws IOException, TimeoutException {
// 通过工具类获取连接
Connection connection=RabbitMqUtil.getConnection();
// 获取连接通道
Channel channel=connection.createChannel();
// 声明交换机
channel.exchangeDeclare(“order_direct”, “direct”);
// 指定 routingKey
String key=”info”;
// 发布消息
channel.basicPublish(“order_direct”, key, null, (“发送给指定路由” + key + “的消息”).getBytes());
// 通过工具关闭channel和释放连接
RabbitMqUtil.close(channel, connection);
}
}

1.指定 routingKey ,即在 basicPublish 方法 的第二个参数中,指定 key 的值

4.2.4.2.2 消费者代码

消费者 1

public class Consumer1 {
public static void main(String[] args) throws IOException {
// 通过工具类获取连接
Connection connection=RabbitMqUtil.getConnection();
Channel channel=connection.createChannel();
// 声明交换机
channel.exchangeDeclare(“order_direct”, “direct”);
// 获取临时队列
String queue=channel.queueDeclare().getQueue();
// 绑定临时队列和交换机
channel.queueBind(queue, “order_direct”, “info”);
channel.queueBind(queue, “order_direct”, “error”);
channel.queueBind(queue, “order_direct”, “warn”);
// 消费消息
channel.basicConsume(queue, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(“消费者1:消费-” + new String(body));
}
});
}
}

1.只是在绑定队列和交换机的时候,增加了 key 这个值

消费者2

public class Consumer2 {
public static void main(String[] args) throws IOException {
// 通过工具类获取连接
Connection connection=RabbitMqUtil.getConnection();
Channel channel=connection.createChannel();
// 声明交换机
channel.exchangeDeclare(“order_direct”, “direct”);
// 获取临时队列
String queue=channel.queueDeclare().getQueue();
// 绑定临时队列和交换机
channel.queueBind(queue, “order_direct”, “error”);
// 消费消息
channel.basicConsume(queue, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(“消费者2:消费-” + new String(body));
}
});
}
}

运行结果:只有消费者 1 收到了消息

[code]消费者1:消费-发送给指定路由info的消息

© 版权声明

相关文章