RabbitMQ简介
一、简介
1、简介
RabbitMQ是使用最广泛的开源消息代理;它是轻量级的,易于部署;支持多种消息传递协议。
RabbitMQ可以运行在许多操作系统和云环境上,并为大多数开发语言提供了开发工具。
2、安装
此处是在Windows下安装的。
- 下载并安装Erlang
RabbitMQ依赖64位的Erlang。
- 下载并安装RabbitMQ
遇到的问题及解决方法:
- 启动时提示’服务名无效’
在 计算机管理 - 服务 中确实没有找到RabbitMQ的服务,可以通过菜单中的RabbitMQ Service - (re)install
重新安装解决。
3、启动
可以直接计算机管理 - 服务中启动或通过菜单RabbitMQ Service - start
来启动服务,启动之后可以通过安装目录sbin目录下执行rabbitmqctl.bat status
来查看服务的状态。
遇到的问题及解决方法:
- 查看状态时显示:
Error: unable to perform an operation on node 'rabbit@acer'.
在诊断信息中可以看到:
DIAGNOSTICS
===========
attempted to contact: [rabbit@acer]
rabbit@acer:
* connected to epmd (port 4369) on acer
* epmd reports node 'rabbit' uses port 25672 for inter-node and CLI tool traff
ic
* TCP connection succeeded but Erlang distribution failed
* Authentication failed (rejected by the remote node), please check the Erlang
cookie
查询资料得知是因为RabbitMQ的erlang.cookie
和用户的erlang.cookie
冲突了,需要将RabbitMQ的erlang.cookie
覆盖到用户的Cookie中:
RabbitMQ的erlang.cookie
文件位置:C:\Windows\System32\config\systemprofile\.erlang.cookie
用户的erlang.cookie
文件位置:C:\Users\xxxxx\.erlang.cookie
4、管理插件
RabbitMQ管理插件提供了一个基于HTTP的API,用于管理和监视RabbitMQ节点和集群,以及一个基于浏览器的UI和一个命令行工具rabbitmqadmin。
- 启用插件
在RabbitMQ安装目录的sbin
目录下执行:
rabbitmq-plugins enable rabbitmq_management
- 创建用户并授权
在RabbitMQ安装目录的sbin
目录下执行::
# create a user
rabbitmqctl add_user full_access s3crEt
# tag the user with "administrator" for full management UI and HTTP API access
rabbitmqctl set_user_tags full_access administrator
访问http://localhost:15672/,输入刚添加的用户的用户名和密码:
二、教程
RabbitMQ是一个消息代理,它可以接受和转发消息。
1、Hello World
此程序演示从命名队列发送和接收消息。
-
术语介绍
-
生产
发送信息的程序就是生产者。
-
队列
队列本质上是一个大的消息缓冲区,多个生产者可以将消息发送到一个队列中,多个消费者也可以从一个队列接收数据。
-
消费
等待接收消息的程序就是消费者。
-
-
Java客户端
RabbitMQ支持多种协议,此处使用AMQP 0-9-1
协议。
需下载客户端包以及它的依赖:SLF4J API和SLF4J Simple。
- 样例-生产者
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Send {
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) throws Exception {
//创建连接和通道
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try(Connection connection = factory.newConnection();
Channel channel = connection.createChannel()){
//声明要发送的队列,该操作是冥等的,只有当队列不存在时才会创建
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello World!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
}
}
}
- 样例-消费者
import java.io.IOException;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;
public class Recv {
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) throws Exception{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//此处也声明了队列,因为可能在生产者之前启动消费者
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. ");
//上面不使用try-with-resource来关闭连接是因为程序需要继续运行来异步侦听消息
DeliverCallback callback = new DeliverCallback() {
@Override
public void handle(String consumerTag, Delivery message) throws IOException {
String content = new String(message.getBody(), "UTF-8");
System.out.println(" [x] Received '" + content + "'");
}
};
channel.basicConsume(QUEUE_NAME, true, callback, new CancelCallback() {
@Override
public void handle(String consumerTag) throws IOException {
System.out.println(" [x] Canceled");
}
});
}
}
- 运行
先运行消费者,可以看到:
[*] Waiting for messages.
再运行生产者,在消费端可以看到:
[x] Received 'Hello World!'
2、工作队列(Work queues)
在此程序中会创建一个工作队列,用于在多个Worker之间分配耗时的任务。
-
术语介绍
-
工作队列
工作队列又称为任务队列,它的主要思想是避免立即执行必须等待完成的资源密集型任务,而是把这些任务封装为消息,并将其发送到队列中,在后台运行的辅助进程将完成这些任务。
-
轮询
默认情况下,RabbitMQ会按顺序将每条消息发送给下一个消费者。平均而言,每个消费者都会收到相同数量的消息。这种分发消息的方式称为轮询。
-
-
约定
为了模拟复杂任务,使用发送消息字符串中的
.
的个数来表示任务的复杂程度,例如:Hello...
表示此任务需要耗时3秒(使用Thread.sleep()
)。 -
消息确认
为了确保消息不会丢失,RabbitMQ支持消息确认。消费者处理完任务后发送回一个ack
,告诉RabbitMQ已经接收并处理了一条特定消息,之后RabbitMQ可以自由删除该消息。
如果消费者在没有发送ack
的情况下死亡(其通道关闭、连接关闭或TCP连接丢失),RabbitMQ将认为消息没有被完全处理,并将重新对其排队。如果有其他消费者同时在线,它将迅速重新发送给另一个消费者。这样就可以确保没有信息丢失。
- 消息持久化
如果不能保证队列和消息的持久化,那么当RabbitMQ退出或崩溃时,任务就会丢失。可以通过在声明队列时设置durable参数为true来实现:
boolean durable = true;
channel.queueDeclare("task_queue", durable, false, false, null);
由于RabbitMQ不允许使用不同的参数重新定义现有队列,因此不能重新声明hello
队列为持久化的(PERSISTENT)。
在发布消息时,也需要将消息标记为持久化的:
channel.basicPublish("", "task_queue", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
- 公平的调度
如果只有两个Worker,且奇数消息任务复杂处理时间长而偶数消息任务简单处理时间短时,就会出现一个Worker一直处于忙的状态而另一个Worker几乎是空闲的状态。为了避免这种情况,可以通过设置channel的prefetchCount来使RabbitMQ公平调度,在一个Worker处理繁忙状态时将新消息发送给一下空闲的Worker。
int prefetchCount = 1;
channel.basicQos(prefetchCount);
- 样例-生产者
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
public class NewTask {
private static final String TASK_QUEUE_NAME = "task_queue";
public static void main(String[] args) throws Exception{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try(Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
//声明队列,durable参数为true
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
String[] messages = new String[]{
"First message.",
"Second message..",
"Third message...",
"Fourth message....",
"Fifth message....."
};
for(String message : messages){
send(channel, message);
}
}
}
private static void send(Channel channel, String message) throws Exception{
channel.basicPublish("", TASK_QUEUE_NAME,
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + message + "'");
}
}
- 样例-消费者
import java.io.IOException;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;
public class Worker {
private static final String TASK_QUEUE_NAME = "task_queue";
public static void main(String[] args) throws Exception{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
final Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
System.out.println(" [*] Waiting for messages.");
channel.basicQos(1);
DeliverCallback callback = new DeliverCallback() {
@Override
public void handle(String consumerTag, Delivery message) throws IOException {
String content = new String(message.getBody(), "UTF-8");
System.out.println(" [x] Received '" + content + "'");
try {
doWork(content);
} finally {
System.out.println(" [x] Done");
channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
}
}
};
channel.basicConsume(TASK_QUEUE_NAME, false, callback, new CancelCallback() {
@Override
public void handle(String consumerTag) throws IOException {
System.out.println(" [x] Canceled");
}
});
}
private static void doWork(String task){
for (char ch : task.toCharArray()) {
if (ch == '.') {
try {
Thread.sleep(1000);
} catch (InterruptedException _ignored) {
Thread.currentThread().interrupt();
}
}
}
}
}
- 运行
先运行两个Worker,然后运行NewTask,输出如下:
NewTask:
[x] Sent 'First message.'
[x] Sent 'Second message..'
[x] Sent 'Third message...'
[x] Sent 'Fourth message....'
[x] Sent 'Fifth message.....'
Worker-1:
[*] Waiting for messages.
[x] Received 'First message.'
[x] Done
[x] Received 'Third message...'
[x] Done
[x] Received 'Fifth message.....'
[x] Done
Worker-2:
[*] Waiting for messages.
[x] Received 'Second message..'
[x] Done
[x] Received 'Fourth message....'
[x] Done
3、发布与订阅(Publish Subscribe)
此程序演示发布订阅模式,之前的教程中是将每个任务只交付给一个Worker,此程序中会向多个消费者发送一个信息。
在这个程序中,会用两个程序组成一个简单的日志系统,第一个程序将发出日志消息,第二个程序接收并打印这些消息。
- 相关内容简介
RabbitMQ中消息传递模型的核心思想是:生产者从不直接将任何消息发送到队列中,很多时候生产者甚至不知道消息是否会被发送到队列中。
相反,生产者只能向交换器(Exchange)发送消息,交换器主要是接收来自生产者的消息并将消息推送到队列中。
交换器的类型决定了交换器处理接收到消息的方式,主要有四种类型:direct
、topic
、headers
和fanout
,此处用fanout
类型来演示(具体介绍可以参考文章最后的相关链接)。
fanout交换是将接收到的所有消息广播到它所知道的所有队列,如下代码是声明一个fanout类型的交换器logs
。
channel.exchangeDeclare("logs", "fanout");
在发布时可以指定交换器(之前的样例中是”“,表示使用默认或无名交换器):
channel.basicPublish("logs", "", null, message.getBytes());
之前的样例中使用了特定名称的队列,在此样例中当连接到Rabbit时,需要一个新的空队列,并在消费者断开连接时自动删除队列;可以在声明队列时使用无参的queueDeclare()
函数:
//创建一个非持久的、独占的、自动删除的队列
String queueName = channel.queueDeclare().getQueue();
绑定交换器和队列:
channel.queueBind(queueName, "logs", "");
- 样例-生产者
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class EmitLog {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] args) throws Exception{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try(Connection connection = factory.newConnection();
Channel channel = connection.createChannel();){
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
String[] messages = new String[]{
"info: Hello World!",
"info: How are you today?"
};
for(String message : messages){
send(channel, message);
}
}
}
private static void send(Channel channel, String message) throws Exception{
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + message + "'");
}
}
- 样例-消费者
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
public class ReceiveLogs {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] args) throws Exception{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, "");
System.out.println(" [*] Waiting for messages.");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
}
}
- 运行
可以通过http://localhost:15672/#/queues查看队列,通过http://localhost:15672/#/exchanges查看交换器。
先运行两个ReceiveLogs,可以在Queues页签中看到会创建出两个队列;并输出:
[*] Waiting for messages.
再运行EmitLog,会看到两个ReceiveLogs中都会输出:
[*] Waiting for messages.
[x] Received 'info: Hello World!'
[x] Received 'info: How are you today?'
退出ReceiveLogs后,会看到相应的队列也自动删除。
4、路由(Routing)
此程序在上一个教程的基础上使用direct类型的交换器,并通过routingKey
来实现只订阅消息的一个子集,例如:日志分为不同类型,只需要将关键的错误信息记录到日志文件中(同时也在控制台输出),而其他的信息只接打印在控制台即可。
可以通过在绑定时添加routingKey参数:
channel.queueBind(queueName, EXCHANGE_NAME, "black");
如上图所示,交换器X有两个绑定的队列,其中一个通过orange
Key来绑定,另一个通过black
Key和green
Key绑定;发布到交换器的消息如果路由Key为orange,则消息将被路由到Q1队列;如果消息的路由Key为black或green,则被路由到Q2队列。
使用同一个Key绑定到多个队列也是允许的,此种情况类似于fanout类型的交换器。
- 样例-生产者
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class EmitLogDirect {
private static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] args) throws Exception{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try(Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
//声明direct类型交换器
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
Map<String, String> messages = new HashMap<>();
messages.put("green", "Hello World!");
messages.put("orange", "How you doing?");
messages.put("black", "What’s wrong?");
for(Entry<String, String> message : messages.entrySet()){
send(channel, message.getKey(), message.getValue());
}
}
}
private static void send(Channel channel, String type, String message) throws Exception{
channel.basicPublish(EXCHANGE_NAME, type, null, message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + type + "':'" + message + "'");
}
}
- 样例-消费者
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
public class ReceiveLogsDirect {
private static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] args) throws Exception{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
String queueName = channel.queueDeclare().getQueue();
//创建此类时,分别使用不同的routingKey绑定,需调整此值
String routingKey = "green";
channel.queueBind(queueName, EXCHANGE_NAME, routingKey);
System.out.println(" [*] Waiting for messages. RoutingKey: " + routingKey);
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
}
}
- 运行
修改消费者程序中的routingKey,分别创建出订阅routingKey为green、orange、black的消费者,之后运行生产者,三个消费者程序会输出:
[*] Waiting for messages. RoutingKey: green
[x] Received 'green':'Hello World!'
[*] Waiting for messages. RoutingKey: orange
[x] Received 'orange':'How you doing?'
[*] Waiting for messages. RoutingKey: black
[x] Received 'black':'What’s wrong?'
5、主题(Topics)
此程序演示使用topic类型交换器。
上一个教程的样例仍然有局限性:它不能基于多个标准进行路由;例如:不仅希望基于颜色,还希望基于种类(动物、植物等)来路由。
发送到主题交换器的路由key不能是任意的key,它必须是由点分隔的单词列表,这此单词通常指定一些与消息相关的特性;key中可以有任意多个单词,但最多不超过255个字节。
绑定时的key必须使用相同的形式;topic交换器与direct交换器类似:使用特定路由key发送的消息将传递给使用匹配的绑定key绑定的所有队列中。绑定时的key有两个特例:
-
*
号可以用来代替一个单词 -
#
可以用来代替零个或多个单词;当队列使用#
绑定时,它将接收所有消息。
当绑定中不使用*
、#
时,topic交换器的行为就和direct交换器一样。
如上图所示,样例中的routingKey由三个单词<speed>.<colour>.<species>
组成,第一个描述速度,第二个描述颜色,第三个描述物种;key举例:quick.orange.rabbit
、lazy.gray.elephant
等。
上图中Q1队列绑定所有橙色的物种消息,Q2绑定兔子相关的消息和关于懒惰动物的消息。
如果使用orange
或quick.orange.male.rabbit
为路由key发送消息,那么这些消息将不会匹配到任何绑定,将被丢失。
- 样例-生产者
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class EmitLogTopic {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] args) throws Exception{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try(Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
//声明交换器类型为topic
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
Map<String, String> messages = new HashMap<>();
messages.put("quick.orange.rabbit", "Hello World!");
messages.put("lazy.orange.fox", "I am so cute!");
messages.put("slow.brown.elephant", "Hi!...");
for(Entry<String, String> message : messages.entrySet()){
send(channel, message.getKey(), message.getValue());
}
}
}
private static void send(Channel channel, String type, String message) throws Exception{
channel.basicPublish(EXCHANGE_NAME, type, null, message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + type + "':'" + message + "'");
}
}
- 样例-消费者
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
public class ReceiveLogsTopic {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] args) throws Exception{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
String queueName = channel.queueDeclare().getQueue();
/*
* 创建此类时,分别使用不同的routingKey绑定
* *.orange.*
* *.*.rabbit
* lazy.#
*/
String routingKey = "lazy.#";
channel.queueBind(queueName, EXCHANGE_NAME, routingKey);
System.out.println(" [*] Waiting for messages. RoutingKey: " + routingKey);
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
}
}
- 运行
修改消费者程序中的routingKey,分别创建出订阅routingKey为*.orange.*
、*.*.rabbit
、lazy.#
的消费者,之后运行生产者,三个消费者程序会输出:
[*] Waiting for messages. RoutingKey: *.orange.*
[x] Received 'quick.orange.rabbit':'Hello World!'
[x] Received 'lazy.orange.fox':'I am so cute!'
[*] Waiting for messages. RoutingKey: *.*.rabbit
[x] Received 'quick.orange.rabbit':'Hello World!'
[*] Waiting for messages. RoutingKey: lazy.#
[x] Received 'lazy.orange.fox':'I am so cute!'
消息Hi!...
不匹配,被丢失。
6、远程过程调用(RPC)
此程序将使用RabbitMQ来构建RPC系统:一个客户机和一个可伸缩的RPC服务器。由于没有任何值得分发的耗时任务,因此将创建一个返回斐波那契数字的虚拟RPC服务。
一般情况下,在RabbitMQ上实现RPC是很容易的:客户机发送请求消息,服务器用响应消息回复。为了接收响应,需要发送一个带有”回调”队列地址的请求;可以使用默认队列:
callbackQueueName = channel.queueDeclare().getQueue();
BasicProperties props = new BasicProperties
.Builder()
.replyTo(callbackQueueName)
.build();
channel.basicPublish("", "rpc_queue", props, message.getBytes());
// ... then code to read a response message from the callback_queue ...
BasicProperties
中常用的属性有:
-
deliveryMode:可以将消息标记为持久的或暂时的。
-
contentType:用于描述编码的mime类型;例如,对于常用的JSON编码,最好将该属性设置为
application/json
。 -
replyTo:通常用于命名回调队列。
-
correlationId:在将RPC响应与请求关联起来时使用。
在上述方法中,为每个RPC请求创建一个回调队列是相当低效的,更好的方法是为每个客户机创建一个回调队列。
通过为每个请求设置它的唯一值(correlationId),当我们在回调队列中收到消息时查看这个属性,并基于这个属性匹配响应和请求。如果收到一个未知的correlationId值,说明它不属于我们的请求可以安全地放弃消息。
如上图所示,例子中的RPC的工作原理如下:
-
对于RPC请求,客户端C发送一条带有两个属性的消息,其中
replyTo
设置为专门为请求创建的匿名独占队列;correlationId
为每个请求设置一个唯一的值。 -
请求被发送到
rpc_queue
队列中。 -
RPC服务器S在等待该队列上的请求,当一个请求出现时,它执行该任务,并使用replyT字段中的队列将带有结果的消息发送回客户端。
-
客户端等待应答队列上的数据,当一条消息(响应)出现时,它会检查correlationId属性,如果它与请求中的值匹配,则将响应返回给应用程序。
-
RPC Server
import java.io.IOException;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;
public class RPCServer {
private static final String RPC_QUEUE_NAME = "rpc_queue";
public static void main(String[] args) throws Exception{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try(Connection connection = factory.newConnection();
Channel channel = connection.createChannel();){
channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
//清除指定队列的内容
channel.queuePurge(RPC_QUEUE_NAME);
//可能需要运行多个服务器进程,为了在多个服务器上平均分配负载
channel.basicQos(1);
System.out.println(" [x] Awaiting RPC requests");
Object monitor = new Object();
DeliverCallback callback = new DeliverCallback() {
@Override
public void handle(String consumerTag, Delivery delivery) throws IOException {
BasicProperties replyProps = new BasicProperties()
.builder()
.correlationId(delivery.getProperties().getCorrelationId())
.build();
String response = "";
try{
String message = new String(delivery.getBody(), "UTF-8");
int n = Integer.parseInt(message);
System.out.println(" [.] fibonacci(" + message + ")");
response += fibonacci(n);
}catch(RuntimeException e){
System.out.println(" [.] " + e.getMessage());
}finally{
channel.basicPublish("", delivery.getProperties().getReplyTo(), replyProps, response.getBytes("UTF-8"));
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
// RabbitMq consumer worker thread notifies the RPC server owner thread
synchronized (monitor) {
monitor.notify();
}
}
}
};
//在队列上注册消费者,当此队列有消息来了之后,就会把消息转发到给此channel处理
channel.basicConsume(RPC_QUEUE_NAME, false, callback, (consumerTag -> { }));
// Wait and be prepared to consume the message from RPC client.
while (true) {
synchronized (monitor) {
try {
monitor.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
private static int fibonacci(int n) {
if (n == 0) return 0;
if (n == 1) return 1;
return fibonacci(n - 1) + fibonacci(n - 2);
}
}
- RPC Client
import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;
public class RPCClient implements AutoCloseable {
private final String requestQueueName = "rpc_queue";
private Connection connection;
private Channel channel;
public RPCClient() throws Exception{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
connection = factory.newConnection();
channel = connection.createChannel();
}
public String call(String message) throws Exception{
final String correlationId = UUID.randomUUID().toString();
String replyQueueName = channel.queueDeclare().getQueue();
BasicProperties props = new BasicProperties()
.builder()
.correlationId(correlationId)
.replyTo(replyQueueName)
.build();
channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8"));
final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);
DeliverCallback callback = new DeliverCallback() {
@Override
public void handle(String consumerTag, Delivery delivery) throws IOException {
if (delivery.getProperties().getCorrelationId().equals(correlationId)) {
response.offer(new String(delivery.getBody(), "UTF-8"));
}
}
};
//basicConsume()方法返回服务器生成的consumerTag
String ctag = channel.basicConsume(replyQueueName, true, callback, consumerTag -> {});
String result = response.take();
//计算出结果后取消消费者
channel.basicCancel(ctag);
return result;
}
@Override
public void close() throws Exception {
connection.close();
}
public static void main(String[] args) throws Exception{
try(RPCClient client = new RPCClient()){
for(int i = 0; i < 10; i++){
String num = Integer.toString(i);
System.out.println(" [x] Requesting fibonacci(" + num + ")");
String response = client.call(num);
System.out.println(" [.] Got '" + response + "'");
}
}catch(Exception e){
e.printStackTrace();
}
}
}
- 运行
运行RPCServer类启动RPC服务:
[x] Awaiting RPC requests
再运行RPCClient启动客户端程序,开始计算:
[x] Requesting fibonacci(0)
[.] Got '0'
[x] Requesting fibonacci(1)
[.] Got '1'
[x] Requesting fibonacci(2)
[.] Got '1'
[x] Requesting fibonacci(3)
[.] Got '2'
[x] Requesting fibonacci(4)
[.] Got '3'
[x] Requesting fibonacci(5)
[.] Got '5'
[x] Requesting fibonacci(6)
[.] Got '8'
[x] Requesting fibonacci(7)
[.] Got '13'
[x] Requesting fibonacci(8)
[.] Got '21'
[x] Requesting fibonacci(9)
[.] Got '34'