一、简介

1、简介

RabbitMQ是使用最广泛的开源消息代理;它是轻量级的,易于部署;支持多种消息传递协议。

RabbitMQ可以运行在许多操作系统和云环境上,并为大多数开发语言提供了开发工具。

2、安装

此处是在Windows下安装的。

RabbitMQ依赖64位的Erlang。

遇到的问题及解决方法:
  • 启动时提示’服务名无效’

计算机管理 - 服务 中确实没有找到RabbitMQ的服务,可以通过菜单中的RabbitMQ Service - (re)install重新安装解决。

3、启动

可以直接计算机管理 - 服务中启动或通过菜单RabbitMQ Service - start来启动服务,启动之后可以通过安装目录sbin目录下执行rabbitmqctl.bat status来查看服务的状态。

遇到的问题及解决方法:
  • 查看状态时显示:Error: unable to perform an operation on node 'rabbit@acer'.

在诊断信息中可以看到:

DIAGNOSTICS
===========

attempted to contact: [rabbit@acer]

rabbit@acer:
  * connected to epmd (port 4369) on acer
  * epmd reports node 'rabbit' uses port 25672 for inter-node and CLI tool traff
ic
  * TCP connection succeeded but Erlang distribution failed

  * Authentication failed (rejected by the remote node), please check the Erlang
 cookie

查询资料得知是因为RabbitMQ的erlang.cookie和用户的erlang.cookie冲突了,需要将RabbitMQ的erlang.cookie覆盖到用户的Cookie中:

RabbitMQ的erlang.cookie文件位置:C:\Windows\System32\config\systemprofile\.erlang.cookie

用户的erlang.cookie文件位置:C:\Users\xxxxx\.erlang.cookie

4、管理插件

RabbitMQ管理插件提供了一个基于HTTP的API,用于管理和监视RabbitMQ节点和集群,以及一个基于浏览器的UI和一个命令行工具rabbitmqadmin。

  • 启用插件

在RabbitMQ安装目录的sbin目录下执行:

rabbitmq-plugins enable rabbitmq_management

  • 创建用户并授权

在RabbitMQ安装目录的sbin目录下执行::

# create a user
rabbitmqctl add_user full_access s3crEt
# tag the user with "administrator" for full management UI and HTTP API access
rabbitmqctl set_user_tags full_access administrator

访问http://localhost:15672/,输入刚添加的用户的用户名和密码:

二、教程

RabbitMQ是一个消息代理,它可以接受和转发消息。

1、Hello World

此程序演示从命名队列发送和接收消息。

  • 术语介绍

    • 生产

      发送信息的程序就是生产者。

    • 队列

      队列本质上是一个大的消息缓冲区,多个生产者可以将消息发送到一个队列中,多个消费者也可以从一个队列接收数据。

    • 消费

      等待接收消息的程序就是消费者。

  • Java客户端

RabbitMQ支持多种协议,此处使用AMQP 0-9-1协议。

下载客户端包以及它的依赖:SLF4J APISLF4J Simple

  • 样例-生产者
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Send {

	private final static String QUEUE_NAME = "hello";
	
	public static void main(String[] args) throws Exception {
		//创建连接和通道
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost("localhost");
		try(Connection connection = factory.newConnection();
			Channel channel = connection.createChannel()){
			//声明要发送的队列,该操作是冥等的,只有当队列不存在时才会创建
			channel.queueDeclare(QUEUE_NAME, false, false, false, null);
			String message = "Hello World!";
			channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
			System.out.println(" [x] Sent '" + message + "'");
		}
	}
}
  • 样例-消费者
import java.io.IOException;

import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;

public class Recv {

	private final static String QUEUE_NAME = "hello";
	
	public static void main(String[] args) throws Exception{
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost("localhost");
		Connection connection = factory.newConnection();
		Channel channel = connection.createChannel();
		//此处也声明了队列,因为可能在生产者之前启动消费者
		channel.queueDeclare(QUEUE_NAME, false, false, false, null);
		System.out.println(" [*] Waiting for messages. ");
		
		//上面不使用try-with-resource来关闭连接是因为程序需要继续运行来异步侦听消息
		DeliverCallback callback = new DeliverCallback() {
			@Override
			public void handle(String consumerTag, Delivery message) throws IOException {
				String content = new String(message.getBody(), "UTF-8");
				System.out.println(" [x] Received '" + content + "'");
			}
		};
		channel.basicConsume(QUEUE_NAME, true, callback, new CancelCallback() {
			@Override
			public void handle(String consumerTag) throws IOException {
				System.out.println(" [x] Canceled");
			}
		});
		
	}
}
  • 运行

先运行消费者,可以看到:

 [*] Waiting for messages. 

再运行生产者,在消费端可以看到:

 [x] Received 'Hello World!'

2、工作队列(Work queues)

在此程序中会创建一个工作队列,用于在多个Worker之间分配耗时的任务。

  • 术语介绍

    • 工作队列

      工作队列又称为任务队列,它的主要思想是避免立即执行必须等待完成的资源密集型任务,而是把这些任务封装为消息,并将其发送到队列中,在后台运行的辅助进程将完成这些任务。

    • 轮询

      默认情况下,RabbitMQ会按顺序将每条消息发送给下一个消费者。平均而言,每个消费者都会收到相同数量的消息。这种分发消息的方式称为轮询。

  • 约定

    为了模拟复杂任务,使用发送消息字符串中的.的个数来表示任务的复杂程度,例如:Hello...表示此任务需要耗时3秒(使用Thread.sleep())。

  • 消息确认

为了确保消息不会丢失,RabbitMQ支持消息确认。消费者处理完任务后发送回一个ack,告诉RabbitMQ已经接收并处理了一条特定消息,之后RabbitMQ可以自由删除该消息。

如果消费者在没有发送ack的情况下死亡(其通道关闭、连接关闭或TCP连接丢失),RabbitMQ将认为消息没有被完全处理,并将重新对其排队。如果有其他消费者同时在线,它将迅速重新发送给另一个消费者。这样就可以确保没有信息丢失。

  • 消息持久化

如果不能保证队列和消息的持久化,那么当RabbitMQ退出或崩溃时,任务就会丢失。可以通过在声明队列时设置durable参数为true来实现:

boolean durable = true;
channel.queueDeclare("task_queue", durable, false, false, null);

由于RabbitMQ不允许使用不同的参数重新定义现有队列,因此不能重新声明hello队列为持久化的(PERSISTENT)。

在发布消息时,也需要将消息标记为持久化的:

channel.basicPublish("", "task_queue", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
  • 公平的调度

如果只有两个Worker,且奇数消息任务复杂处理时间长而偶数消息任务简单处理时间短时,就会出现一个Worker一直处于忙的状态而另一个Worker几乎是空闲的状态。为了避免这种情况,可以通过设置channel的prefetchCount来使RabbitMQ公平调度,在一个Worker处理繁忙状态时将新消息发送给一下空闲的Worker。

int prefetchCount = 1;
channel.basicQos(prefetchCount);
  • 样例-生产者
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;

public class NewTask {

	private static final String TASK_QUEUE_NAME = "task_queue";
	
	public static void main(String[] args) throws Exception{
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost("localhost");
		try(Connection connection = factory.newConnection();
			Channel channel = connection.createChannel()) {
			//声明队列,durable参数为true
			channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
			String[] messages = new String[]{
				"First message.",
				"Second message..",
				"Third message...",
				"Fourth message....",
				"Fifth message....."
			};
			for(String message : messages){
				send(channel, message);
			}
		}
	}
	
	private static void send(Channel channel, String message) throws Exception{
		channel.basicPublish("", TASK_QUEUE_NAME,
                MessageProperties.PERSISTENT_TEXT_PLAIN,
                message.getBytes("UTF-8"));
        System.out.println(" [x] Sent '" + message + "'");
	}
}
  • 样例-消费者
import java.io.IOException;

import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;

public class Worker {
	
	private static final String TASK_QUEUE_NAME = "task_queue";
	
	public static void main(String[] args) throws Exception{
		ConnectionFactory factory = new ConnectionFactory();
	    factory.setHost("localhost");
	    final Connection connection = factory.newConnection();
	    final Channel channel = connection.createChannel();
	    
	    channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
	    System.out.println(" [*] Waiting for messages.");
	    
	    channel.basicQos(1);
	    
	    DeliverCallback callback = new DeliverCallback() {
			@Override
			public void handle(String consumerTag, Delivery message) throws IOException {
				String content = new String(message.getBody(), "UTF-8");
				System.out.println(" [x] Received '" + content + "'");
				
				 try {
		            doWork(content);
		        } finally {
		            System.out.println(" [x] Done");
		            channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
		        }
			}
		};
		channel.basicConsume(TASK_QUEUE_NAME, false, callback, new CancelCallback() {
			@Override
			public void handle(String consumerTag) throws IOException {
				System.out.println(" [x] Canceled");
			}
		});
	}
	
	private static void doWork(String task){
		for (char ch : task.toCharArray()) {
	        if (ch == '.') {
	            try {
	                Thread.sleep(1000);
	            } catch (InterruptedException _ignored) {
	                Thread.currentThread().interrupt();
	            }
	        }
	    }
	}
}
  • 运行

先运行两个Worker,然后运行NewTask,输出如下:

NewTask:

 [x] Sent 'First message.'
 [x] Sent 'Second message..'
 [x] Sent 'Third message...'
 [x] Sent 'Fourth message....'
 [x] Sent 'Fifth message.....'

Worker-1:

 [*] Waiting for messages.
 [x] Received 'First message.'
 [x] Done
 [x] Received 'Third message...'
 [x] Done
 [x] Received 'Fifth message.....'
 [x] Done

Worker-2:

 [*] Waiting for messages.
 [x] Received 'Second message..'
 [x] Done
 [x] Received 'Fourth message....'
 [x] Done

3、发布与订阅(Publish Subscribe)

此程序演示发布订阅模式,之前的教程中是将每个任务只交付给一个Worker,此程序中会向多个消费者发送一个信息。

在这个程序中,会用两个程序组成一个简单的日志系统,第一个程序将发出日志消息,第二个程序接收并打印这些消息。

  • 相关内容简介

RabbitMQ中消息传递模型的核心思想是:生产者从不直接将任何消息发送到队列中,很多时候生产者甚至不知道消息是否会被发送到队列中。

相反,生产者只能向交换器(Exchange)发送消息,交换器主要是接收来自生产者的消息并将消息推送到队列中。

交换器的类型决定了交换器处理接收到消息的方式,主要有四种类型:directtopicheadersfanout,此处用fanout类型来演示(具体介绍可以参考文章最后的相关链接)。

fanout交换是将接收到的所有消息广播到它所知道的所有队列,如下代码是声明一个fanout类型的交换器logs

channel.exchangeDeclare("logs", "fanout");

在发布时可以指定交换器(之前的样例中是”“,表示使用默认或无名交换器):

channel.basicPublish("logs", "", null, message.getBytes());

之前的样例中使用了特定名称的队列,在此样例中当连接到Rabbit时,需要一个新的空队列,并在消费者断开连接时自动删除队列;可以在声明队列时使用无参的queueDeclare()函数:

//创建一个非持久的、独占的、自动删除的队列
String queueName = channel.queueDeclare().getQueue();

绑定交换器和队列:

channel.queueBind(queueName, "logs", "");

  • 样例-生产者
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class EmitLog {

	private static final String EXCHANGE_NAME = "logs";
	
	public static void main(String[] args) throws Exception{
		ConnectionFactory factory = new ConnectionFactory();
	    factory.setHost("localhost");
	    try(Connection connection = factory.newConnection();
	    	Channel channel = connection.createChannel();){
	    	channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
	    	
	    	String[] messages = new String[]{
	    		"info: Hello World!",
	    		"info: How are you today?"
	    	};
	    	for(String message : messages){
	    		send(channel, message);
	    	}
	    }
	}
	
	private static void send(Channel channel, String message) throws Exception{
		channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
        System.out.println(" [x] Sent '" + message + "'");
	}
}
  • 样例-消费者
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

public class ReceiveLogs {

	private static final String EXCHANGE_NAME = "logs";
	
	public static void main(String[] args) throws Exception{
		ConnectionFactory factory = new ConnectionFactory();
	    factory.setHost("localhost");
	    Connection connection = factory.newConnection();
	    Channel channel = connection.createChannel();
	    
	    channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
	    String queueName = channel.queueDeclare().getQueue();
	    channel.queueBind(queueName, EXCHANGE_NAME, "");
	    
	    System.out.println(" [*] Waiting for messages.");
	    
	    DeliverCallback deliverCallback = (consumerTag, delivery) -> {
	        String message = new String(delivery.getBody(), "UTF-8");
	        System.out.println(" [x] Received '" + message + "'");
	    };
	    channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
	}
}
  • 运行

可以通过http://localhost:15672/#/queues查看队列,通过http://localhost:15672/#/exchanges查看交换器。

先运行两个ReceiveLogs,可以在Queues页签中看到会创建出两个队列;并输出:

 [*] Waiting for messages.

再运行EmitLog,会看到两个ReceiveLogs中都会输出:

 [*] Waiting for messages.
 [x] Received 'info: Hello World!'
 [x] Received 'info: How are you today?'

退出ReceiveLogs后,会看到相应的队列也自动删除。

4、路由(Routing)

此程序在上一个教程的基础上使用direct类型的交换器,并通过routingKey来实现只订阅消息的一个子集,例如:日志分为不同类型,只需要将关键的错误信息记录到日志文件中(同时也在控制台输出),而其他的信息只接打印在控制台即可。

可以通过在绑定时添加routingKey参数:

channel.queueBind(queueName, EXCHANGE_NAME, "black");

如上图所示,交换器X有两个绑定的队列,其中一个通过orange Key来绑定,另一个通过black Key和green Key绑定;发布到交换器的消息如果路由Key为orange,则消息将被路由到Q1队列;如果消息的路由Key为black或green,则被路由到Q2队列。

使用同一个Key绑定到多个队列也是允许的,此种情况类似于fanout类型的交换器。

  • 样例-生产者
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class EmitLogDirect {

	private static final String EXCHANGE_NAME = "direct_logs";
	
	public static void main(String[] args) throws Exception{
		ConnectionFactory factory = new ConnectionFactory();
	    factory.setHost("localhost");
	    
	    try(Connection connection = factory.newConnection();
	    	Channel channel = connection.createChannel()) {
	    	//声明direct类型交换器
	    	channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
	    	
	    	Map<String, String> messages = new HashMap<>();
	    	messages.put("green", "Hello World!");
	    	messages.put("orange", "How you doing?");
	    	messages.put("black", "What’s wrong?");
	    	
	    	for(Entry<String, String> message : messages.entrySet()){
	    		send(channel, message.getKey(), message.getValue());
	    	}
	    }
	    
	}
	
	private static void send(Channel channel, String type, String message) throws Exception{
		channel.basicPublish(EXCHANGE_NAME, type, null, message.getBytes("UTF-8"));
		System.out.println(" [x] Sent '" + type + "':'" + message + "'");
	}
}
  • 样例-消费者
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

public class ReceiveLogsDirect {

	private static final String EXCHANGE_NAME = "direct_logs";
	
	public static void main(String[] args) throws Exception{
		ConnectionFactory factory = new ConnectionFactory();
	    factory.setHost("localhost");
	    Connection connection = factory.newConnection();
	    Channel channel = connection.createChannel();
	    
	    channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
	    String queueName = channel.queueDeclare().getQueue();
	    
	    //创建此类时,分别使用不同的routingKey绑定,需调整此值
	    String routingKey = "green";
	    channel.queueBind(queueName, EXCHANGE_NAME, routingKey);
	    
	    System.out.println(" [*] Waiting for messages. RoutingKey: " + routingKey);
	    
	    DeliverCallback deliverCallback = (consumerTag, delivery) -> {
	        String message = new String(delivery.getBody(), "UTF-8");
	        System.out.println(" [x] Received '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
	    };
	    channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
	}
}
  • 运行

修改消费者程序中的routingKey,分别创建出订阅routingKey为green、orange、black的消费者,之后运行生产者,三个消费者程序会输出:

 [*] Waiting for messages. RoutingKey: green
 [x] Received 'green':'Hello World!'
 [*] Waiting for messages. RoutingKey: orange
 [x] Received 'orange':'How you doing?'
 [*] Waiting for messages. RoutingKey: black
 [x] Received 'black':'What’s wrong?'

5、主题(Topics)

此程序演示使用topic类型交换器。

上一个教程的样例仍然有局限性:它不能基于多个标准进行路由;例如:不仅希望基于颜色,还希望基于种类(动物、植物等)来路由。

发送到主题交换器的路由key不能是任意的key,它必须是由点分隔的单词列表,这此单词通常指定一些与消息相关的特性;key中可以有任意多个单词,但最多不超过255个字节。

绑定时的key必须使用相同的形式;topic交换器与direct交换器类似:使用特定路由key发送的消息将传递给使用匹配的绑定key绑定的所有队列中。绑定时的key有两个特例:

  • *号可以用来代替一个单词

  • #可以用来代替零个或多个单词;当队列使用#绑定时,它将接收所有消息。

当绑定中不使用*#时,topic交换器的行为就和direct交换器一样。

如上图所示,样例中的routingKey由三个单词<speed>.<colour>.<species>组成,第一个描述速度,第二个描述颜色,第三个描述物种;key举例:quick.orange.rabbitlazy.gray.elephant等。

上图中Q1队列绑定所有橙色的物种消息,Q2绑定兔子相关的消息和关于懒惰动物的消息。

如果使用orangequick.orange.male.rabbit为路由key发送消息,那么这些消息将不会匹配到任何绑定,将被丢失。

  • 样例-生产者
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class EmitLogTopic {

	private static final String EXCHANGE_NAME = "topic_logs";
	
	public static void main(String[] args) throws Exception{
		ConnectionFactory factory = new ConnectionFactory();
	    factory.setHost("localhost");
	    try(Connection connection = factory.newConnection();
	    	Channel channel = connection.createChannel()) {
	    	//声明交换器类型为topic
	    	channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
	    	
	    	Map<String, String> messages = new HashMap<>();
	    	messages.put("quick.orange.rabbit", "Hello World!");
	    	messages.put("lazy.orange.fox", "I am so cute!");
	    	messages.put("slow.brown.elephant", "Hi!...");
	    	
	    	for(Entry<String, String> message : messages.entrySet()){
	    		send(channel, message.getKey(), message.getValue());
	    	}
	    }
	}
	
	private static void send(Channel channel, String type, String message) throws Exception{
		channel.basicPublish(EXCHANGE_NAME, type, null, message.getBytes("UTF-8"));
		System.out.println(" [x] Sent '" + type + "':'" + message + "'");
	}
}
  • 样例-消费者
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

public class ReceiveLogsTopic {

	private static final String EXCHANGE_NAME = "topic_logs";
	
	public static void main(String[] args) throws Exception{
		ConnectionFactory factory = new ConnectionFactory();
	    factory.setHost("localhost");
	    Connection connection = factory.newConnection();
	    Channel channel = connection.createChannel();
	    		
	    channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
	    String queueName = channel.queueDeclare().getQueue();
	    
	    /*
	     * 创建此类时,分别使用不同的routingKey绑定
	     *    *.orange.*
	     *    *.*.rabbit
	     *    lazy.#
	     */
	    String routingKey = "lazy.#";
	    channel.queueBind(queueName, EXCHANGE_NAME, routingKey);
	    
	    System.out.println(" [*] Waiting for messages. RoutingKey: " + routingKey);
	    
	    DeliverCallback deliverCallback = (consumerTag, delivery) -> {
	        String message = new String(delivery.getBody(), "UTF-8");
	        System.out.println(" [x] Received '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
	    };
	    channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
	}
}
  • 运行

修改消费者程序中的routingKey,分别创建出订阅routingKey为*.orange.**.*.rabbitlazy.#的消费者,之后运行生产者,三个消费者程序会输出:

 [*] Waiting for messages. RoutingKey: *.orange.*
 [x] Received 'quick.orange.rabbit':'Hello World!'
 [x] Received 'lazy.orange.fox':'I am so cute!'
 [*] Waiting for messages. RoutingKey: *.*.rabbit
 [x] Received 'quick.orange.rabbit':'Hello World!'
 [*] Waiting for messages. RoutingKey: lazy.#
 [x] Received 'lazy.orange.fox':'I am so cute!'

消息Hi!...不匹配,被丢失。

6、远程过程调用(RPC)

此程序将使用RabbitMQ来构建RPC系统:一个客户机和一个可伸缩的RPC服务器。由于没有任何值得分发的耗时任务,因此将创建一个返回斐波那契数字的虚拟RPC服务。

一般情况下,在RabbitMQ上实现RPC是很容易的:客户机发送请求消息,服务器用响应消息回复。为了接收响应,需要发送一个带有”回调”队列地址的请求;可以使用默认队列:

callbackQueueName = channel.queueDeclare().getQueue();

BasicProperties props = new BasicProperties
                            .Builder()
                            .replyTo(callbackQueueName)
                            .build();

channel.basicPublish("", "rpc_queue", props, message.getBytes());

// ... then code to read a response message from the callback_queue ...

BasicProperties中常用的属性有:

  • deliveryMode:可以将消息标记为持久的或暂时的。

  • contentType:用于描述编码的mime类型;例如,对于常用的JSON编码,最好将该属性设置为application/json

  • replyTo:通常用于命名回调队列。

  • correlationId:在将RPC响应与请求关联起来时使用。

在上述方法中,为每个RPC请求创建一个回调队列是相当低效的,更好的方法是为每个客户机创建一个回调队列。

通过为每个请求设置它的唯一值(correlationId),当我们在回调队列中收到消息时查看这个属性,并基于这个属性匹配响应和请求。如果收到一个未知的correlationId值,说明它不属于我们的请求可以安全地放弃消息。

如上图所示,例子中的RPC的工作原理如下:

  • 对于RPC请求,客户端C发送一条带有两个属性的消息,其中replyTo设置为专门为请求创建的匿名独占队列;correlationId为每个请求设置一个唯一的值。

  • 请求被发送到rpc_queue队列中。

  • RPC服务器S在等待该队列上的请求,当一个请求出现时,它执行该任务,并使用replyT字段中的队列将带有结果的消息发送回客户端。

  • 客户端等待应答队列上的数据,当一条消息(响应)出现时,它会检查correlationId属性,如果它与请求中的值匹配,则将响应返回给应用程序。

  • RPC Server

import java.io.IOException;

import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;

public class RPCServer {

	private static final String RPC_QUEUE_NAME = "rpc_queue";
	
	public static void main(String[] args) throws Exception{
		ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        
        try(Connection connection = factory.newConnection();
        	Channel channel = connection.createChannel();){
        	channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
        	//清除指定队列的内容
        	channel.queuePurge(RPC_QUEUE_NAME);
        	//可能需要运行多个服务器进程,为了在多个服务器上平均分配负载
        	channel.basicQos(1);
        	
        	System.out.println(" [x] Awaiting RPC requests");
        	
        	Object monitor = new Object();
        	DeliverCallback callback = new DeliverCallback() {
				@Override
				public void handle(String consumerTag, Delivery delivery) throws IOException {
					BasicProperties replyProps = new BasicProperties()
							.builder()
							.correlationId(delivery.getProperties().getCorrelationId())
							.build();
					
					String response = "";
					try{
						String message = new String(delivery.getBody(), "UTF-8");
	                    int n = Integer.parseInt(message);
	                    System.out.println(" [.] fibonacci(" + message + ")");
	                    response += fibonacci(n);
					}catch(RuntimeException e){
						System.out.println(" [.] " + e.getMessage());
					}finally{
						channel.basicPublish("", delivery.getProperties().getReplyTo(), replyProps, response.getBytes("UTF-8"));
						channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
						// RabbitMq consumer worker thread notifies the RPC server owner thread
	                    synchronized (monitor) {
	                        monitor.notify();
	                    }
					}
				}
			};
			//在队列上注册消费者,当此队列有消息来了之后,就会把消息转发到给此channel处理
			channel.basicConsume(RPC_QUEUE_NAME, false, callback, (consumerTag -> { }));
			
			// Wait and be prepared to consume the message from RPC client.
            while (true) {
                synchronized (monitor) {
                    try {
                        monitor.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
	}
	
	private static int fibonacci(int n) {
        if (n == 0) return 0;
        if (n == 1) return 1;
        return fibonacci(n - 1) + fibonacci(n - 2);
    }
}
  • RPC Client
import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;

public class RPCClient implements AutoCloseable {

	private final String requestQueueName = "rpc_queue";

	private Connection connection;
	private Channel channel;

	public RPCClient() throws Exception{
		ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        connection = factory.newConnection();
        channel = connection.createChannel();
	}
	
	public String call(String message) throws Exception{
		final String correlationId = UUID.randomUUID().toString();
		
		String replyQueueName = channel.queueDeclare().getQueue();
		BasicProperties props = new BasicProperties()
				.builder()
				.correlationId(correlationId)
				.replyTo(replyQueueName)
				.build();
		
		channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8"));
		
		final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);
		
		DeliverCallback callback = new DeliverCallback() {
			@Override
			public void handle(String consumerTag, Delivery delivery) throws IOException {
				if (delivery.getProperties().getCorrelationId().equals(correlationId)) {
	                response.offer(new String(delivery.getBody(), "UTF-8"));
	            }
			}
		};
		//basicConsume()方法返回服务器生成的consumerTag
		String ctag = channel.basicConsume(replyQueueName, true, callback, consumerTag -> {});
		String result = response.take();
		//计算出结果后取消消费者
		channel.basicCancel(ctag);
		return result;
	}

	@Override
	public void close() throws Exception {
		connection.close();
	}
	
	public static void main(String[] args) throws Exception{
		try(RPCClient client = new RPCClient()){
			for(int i = 0; i < 10; i++){
				String num = Integer.toString(i);
				System.out.println(" [x] Requesting fibonacci(" + num + ")");
				String response = client.call(num);
				System.out.println(" [.] Got '" + response + "'");
			}
		}catch(Exception e){
			e.printStackTrace();
		}
	}
}
  • 运行

运行RPCServer类启动RPC服务:

 [x] Awaiting RPC requests

再运行RPCClient启动客户端程序,开始计算:

 [x] Requesting fibonacci(0)
 [.] Got '0'
 [x] Requesting fibonacci(1)
 [.] Got '1'
 [x] Requesting fibonacci(2)
 [.] Got '1'
 [x] Requesting fibonacci(3)
 [.] Got '2'
 [x] Requesting fibonacci(4)
 [.] Got '3'
 [x] Requesting fibonacci(5)
 [.] Got '5'
 [x] Requesting fibonacci(6)
 [.] Got '8'
 [x] Requesting fibonacci(7)
 [.] Got '13'
 [x] Requesting fibonacci(8)
 [.] Got '21'
 [x] Requesting fibonacci(9)
 [.] Got '34'
参考资料: