微秒级延迟消息中间件-RabbitMQ

概述

RabbitMQ 是由 Erlang 语言开发的,基于AMQP(Advanced Message Queue 高级消息队列协议)协议实现的消息队列。

Erlang 是一种通用的并发程序设计语言,它由乔·阿姆斯特朗(Joe Armstrong)在瑞典电信设备制造商爱立信所辖的计算机科学研究室开发,目的是创造一种可以应付大规模并发活动的程序设计语言和运行环境。

特性

  • 响应速度是微秒级,非常快。
  • 轻量级,易于在操作系统和云中部署运行。
  • 强大的管理插件,方便用户在浏览器中管理RabbitMQ。
  • 不支持分布式,但可以以镜像集群模式保证高可用。
  • 开发社区非常活跃,文档完善,易于后期维护。

快速入门

入门程序工作模式

生产者发送消息到消息队列中,消费者从队列中获取消息。

搭建基本环境

  1. 新建两个工程:生产者和消费者

  2. 分别在两个工程中引入依赖

    1
    2
    3
    4
    5
    6
    7
    8
    9
    <dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp‐client</artifactId>
    <version>4.0.3</version><!‐‐此版本与spring boot 1.5.9版本匹配‐‐>
    </dependency>
    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring‐boot‐starter‐logging</artifactId>
    </dependency>

创建生产者

新建生产者类Producer01,基本流程:

  1. 创建连接
  2. 创建通道
  3. 声明队列
  4. 发送消息

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    public class Producer01 {
    //队列
    private static final String QUEUE = "HelloWorld";

    public static void main(String[] args) {
    //通过连接工厂创建新的连接和MQ建立连接
    ConnectionFactory connectionFactory = new ConnectionFactory();
    //设置IP
    connectionFactory.setHost("127.0.0.1");
    //设置端口
    connectionFactory.setPort(5672);
    //设置用户名
    connectionFactory.setUsername("guest");
    //设置密码
    connectionFactory.setPassword("guest");
    //设置虚拟机,一个MQ服务可以设置多个虚拟机,每个虚拟机相当于一个独立的MQ
    connectionFactory.setVirtualHost("/");

    Connection connection = null;
    Channel channel = null;
    try {
    //建立新连接
    connection = connectionFactory.newConnection();
    //创建会话通道,生产者和MQ服务所有通信都在Channel通道中完成
    channel = connection.createChannel();
    //声明队列,如果队列在MQ中没有则要创建
    /**
    * 参数:String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
    * param1:队列名称
    * param2:是否持久化,如果持久化,MQ重启后队列还在
    * param3:exclusive 是否独占连接,队列只允许在该连接中访问,如果connection连接关闭则队列自动删除,如果设为true,可用于临时队列的创建
    * param4:autoDelete 自动删除。队列不再使用时是否自动删除此队列,如果将此参数和exclusive设置为true,就可以实现临时队列(队列不用了就自动删除)
    * param5:arguments 参数。可以设置一个队列的扩展参数。如:可设置存活时间
    */
    channel.queueDeclare(QUEUE, true, false, false, null);
    //发送消息
    //定义消息内容
    String message = "Hello你好MQ";

    /**
    * 消息发布方法
    * param1:Exchange的名称,如果没有指定,则使用Default Exchange(设置为"")
    * param2:routingKey,消息的路由Key,是用于Exchange(交换机)将消息转发到指定的消息队列,如果使用默认交换机,routingKey设置为队列名称
    * param3:消息包含的属性
    * param4:消息体
    */
    channel.basicPublish("", QUEUE, null, message.getBytes());
    System.out.println("Message has been sent to RabbitMQ: " + message);
    } catch (Exception e) {
    e.printStackTrace();
    } finally {
    //关闭连接
    try {
    channel.close();
    } catch (IOException e) {
    e.printStackTrace();
    } catch (TimeoutException e) {
    e.printStackTrace();
    }
    try {
    connection.close();
    } catch (IOException e) {
    e.printStackTrace();
    }
    }
    }
    }

    控制台输出发送消息成功

RabbitMQ控制平台可以看到新建的Queue

可以看到待发送消息1条

可以在这里获取消息内容

创建消费者

新建消费者类Consumer01,基本流程:

  1. 创建连接

  2. 创建通道

  3. 声明队列

  4. 监听队列

  5. 接收消息

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    public class Consumer01 {
    //队列
    private static final String QUEUE = "HelloWorld";

    public static void main(String[] args) throws Exception {
    //通过连接工厂创建新的连接和MQ建立连接
    ConnectionFactory connectionFactory = new ConnectionFactory();
    //设置IP
    connectionFactory.setHost("127.0.0.1");
    //设置端口
    connectionFactory.setPort(5672);
    //设置用户名
    connectionFactory.setUsername("guest");
    //设置密码
    connectionFactory.setPassword("guest");
    //设置虚拟机,一个MQ服务可以设置多个虚拟机,每个虚拟机相当于一个独立的MQ
    connectionFactory.setVirtualHost("/");

    //建立新连接
    Connection connection = connectionFactory.newConnection();
    //创建会话通道,生产者和MQ服务所有通信都在Channel通道中完成
    Channel channel = connection.createChannel();

    //声明队列
    channel.queueDeclare(QUEUE, true, false, false, null);

    //实现消费方法
    DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
    /**
    * 当接收到消息后,此方法将被调用
    * @param consumerTag 消费者标签,用来标识消费者,可以在监听队列时设置channel.basicConsume()
    * @param envelope 信封,消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送)
    * @param properties 消息属性
    * @param body 消息内容
    * @throws IOException
    */
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    //交换机
    String exchange = envelope.getExchange();
    //消息id,MQ在channel中用来标识消息的id,用于确认消息已接收
    long deliveryTag = envelope.getDeliveryTag();
    //消息内容
    String message = new String(body, "utf-8");
    System.out.println("Message has been received: "+message);


    }
    };

    //监听队列
    /**
    * 监听队列方法 basicConsume
    * 参数:String queue, boolean autoAck,Consumer callback
    * 1、队列名称
    * 2、是否自动回复,设置为true为表示消息接收到自动向MQ回复接收到了,MQ接收到回复会删除消息,设置为false则需要通过编码实现回复
    * 3、消费消息的方法,消费者接收到消息后调用此方法
    */
    channel.basicConsume(QUEUE,true,defaultConsumer);
    }
    }

控制台输出接收消息成功

消息已被消费,MQ中剩余0条消息

工作模式

Work queues(工作队列)

与入门程序相比,Work queues 由多个消费者共同分担队列中的任务,RabbitMQ采用轮询的方式将消息依次发送给不同消费者

在入门程序中启动三个消费者(开启顺序为1、2、3),当生产者第一次发送消息,消费者1会收到消息,第二次发送时消费者2收到消息,第三次发送时消费者3收到消息,第四次发送又是消费者1收到消息,以此类推。

Publish/Subscribe(发布订阅)

与工作队列相比,Pub/Sub 模式多了交换机,当生产者发送消息时,会先发送消息给交换机,由交换机将消息转发到各个队列中,消费者通过监听自己队列的方式获取消息。

在Work queues 中,生产者发出消息后,消费者只能依次接收到消息。而Pub/Sub模式中,多个消费者可以同时接收到消息,类似广播。

使用编码的方式实现Pub/Sub模式
举个栗子:当用户完成充值后,以邮件和短信两种方式通知

  1. 生产者:声明交换机 Exchange_fanout_inform

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    public class Producer02_publish {
    //队列名称
    private static final String QUEUE_INFORM_EMAIL = "queue_inform_email";
    private static final String QUEUE_INFORM_SMS = "queue_inform_sms";
    private static final String EXCHANGE_FANOUT_INFORM="exchange_fanout_inform";

    public static void main(String[] args) {
    //通过连接工厂创建新的连接和MQ建立连接
    ConnectionFactory connectionFactory = new ConnectionFactory();
    //设置IP
    connectionFactory.setHost("127.0.0.1");
    //设置端口
    connectionFactory.setPort(5672);
    //设置用户名
    connectionFactory.setUsername("guest");
    //设置密码
    connectionFactory.setPassword("guest");
    //设置虚拟机,一个MQ服务可以设置多个虚拟机,每个虚拟机相当于一个独立的MQ
    connectionFactory.setVirtualHost("/");

    Connection connection = null;
    Channel channel = null;
    try {
    //建立新连接
    connection = connectionFactory.newConnection();
    //创建会话通道,生产者和MQ服务所有通信都在Channel通道中完成
    channel = connection.createChannel();
    //声明队列,如果队列在MQ中没有则要创建
    channel.queueDeclare(QUEUE_INFORM_EMAIL, true, false, false, null);
    channel.queueDeclare(QUEUE_INFORM_SMS, true, false, false, null);
    //声明一个交换机 String exchange, String type
    /**
    * 参数明细
    * 1、交换机名称
    * 2、交换机类型,
    * fanout:对应RabbitMQ的工作模式 pub/sub
    * topic:对应 topic
    * direct:对应 routing
    * headers:对应 headers
    */
    channel.exchangeDeclare(EXCHANGE_FANOUT_INFORM, BuiltinExchangeType.FANOUT);
    //交换机和队列绑定String queue, String exchange, String routingKey
    /**
    * 参数明细
    * 1、queue 队列名称
    * 2、exchange 交换机名称
    * 3、routingKey 路由key,作用是交换机根据路由Key的值将消息转发到指定队列,在Pub/Sub模式中调协为空字符串
    */
    channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_FANOUT_INFORM,"");
    channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_FANOUT_INFORM,"");

    //发5条消息
    for (int i = 0; i < 5; i++) {
    //发送消息
    //定义消息内容
    String message = "Send inform message to user";

    channel.basicPublish(EXCHANGE_FANOUT_INFORM, "", null, message.getBytes());
    System.out.println("Message has been sent to RabbitMQ: " + message);
    }

    } catch (Exception e) {
    e.printStackTrace();
    } finally {
    //关闭连接:代码略
    }
    }
    }
  2. 邮件通知消费者

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    public class Consumer02_subscribe_email {
    //队列名称
    private static final String QUEUE_INFORM_EMAIL = "queue_inform_email";
    private static final String EXCHANGE_FANOUT_INFORM="exchange_fanout_inform";

    public static void main(String[] args) throws Exception {
    //通过连接工厂创建新的连接和MQ建立连接
    ConnectionFactory connectionFactory = new ConnectionFactory();
    //设置IP
    connectionFactory.setHost("127.0.0.1");
    //设置端口
    connectionFactory.setPort(5672);
    //设置用户名
    connectionFactory.setUsername("guest");
    //设置密码
    connectionFactory.setPassword("guest");
    //设置虚拟机,一个MQ服务可以设置多个虚拟机,每个虚拟机相当于一个独立的MQ
    connectionFactory.setVirtualHost("/");

    //建立新连接
    Connection connection = connectionFactory.newConnection();
    //创建会话通道,生产者和MQ服务所有通信都在Channel通道中完成
    Channel channel = connection.createChannel();

    //声明队列
    channel.queueDeclare(QUEUE_INFORM_EMAIL, true, false, false, null);

    //声明一个交换机 String exchange, String type
    /**
    * 参数明细
    * 1、交换机名称
    * 2、交换机类型,
    * fanout:对应RabbitMQ的工作模式 pub/sub
    * topic:对应 topic
    * direct:对应 routing
    * headers:对应 headers
    */
    channel.exchangeDeclare(EXCHANGE_FANOUT_INFORM, BuiltinExchangeType.FANOUT);
    //交换机和队列绑定String queue, String exchange, String routingKey
    /**
    * 参数明细
    * 1、queue 队列名称
    * 2、exchange 交换机名称
    * 3、routingKey 路由key,作用是交换机根据路由Key的值将消息转发到指定队列,在Pub/Sub模式中调协为空字符串
    */
    channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_FANOUT_INFORM,"");

    //实现消费方法
    DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    //交换机
    String exchange = envelope.getExchange();
    //消息id,MQ在channel中用来标识消息的id,用于确认消息已接收
    long deliveryTag = envelope.getDeliveryTag();
    //消息内容
    String message = new String(body, "utf-8");
    System.out.println("Message has been received: "+message);
    }
    };

    //监听队列
    channel.basicConsume(QUEUE_INFORM_EMAIL,true,defaultConsumer);
    }
    }
  3. 短信通知消费者(代码略,同邮件消费者)

  4. 打开管理平台,使用生产者发5条消息,再依次打开邮件消费者和短信消费者,发现两者都接收到5条消息。如果打开2个邮件消费者,1个短信消费者,再用生产者发5条消息,则结果是短信消费者收到5条消息,2个邮件消费者1个收到3条,1个收到2条消息。

结论:

  • Pub/Sub 模式中,交换机绑定多个队列,每个消费者会监听自己的队列获取消息
  • 如果多个消费者监听同一个队列,则按 Work queues 模式轮询获取消息

Routing(路由)

与发布订阅模式相比,Routing 模式下,交换机绑定队列时,每个队列会指定一个 routingKey,而且一个队列可以指定多个 routingKey。生产者发送消息时,交换机会根据 routingKey 是否匹配来转发消息到指定的队列

以下图为例,如果生产者发出的 routingKey 为 error,两个队列绑定交换机时都设定了 error 这个值,则消息会分别发送到这两个队列中;如果生产者发出的 routingKey 为 warning,那么由于消费者1绑定交换机时未指定这个 routingKey,无法匹配,而消费者2中指定了 warning 这个 routingKey,所以消息只会发送到下面的队列中。最终,消息只能被消费者2接收到。

由于 Routing 模式与 Pub/Sub 模式代码相似,只是增加了 routingKey 的设置,在此不赘述。最终可以在控制台看到设置的 routingKey

Topic(通配符)

与 Routing 不同,通配符模式 routingKey 匹配方式允许通配符,Routing 模式是相等匹配,Topic 模式是通配符匹配
统配符规则:符号#可以匹配多个词(词之间用.分割),符号*可以匹配一个词语。

Header(键值对匹配)

Header模式与 Routing 的不同,Header 模式取消 routingKey,使用 Header 中的 key/value 匹配队列。Header 实际上是以 Map 结构存储的

RPC(Remote Procedure Call 远程过程调用)

RPC 是客户端远程调用服务端的工作模式 ,使用MQ可以实现RPC的异步调用,是基于Direct交换机实现的。具体实现方式如下:

  1. 客户端既是生产者又是消费者,向 RPC 请求队列发送 RPC 调用消息(图中 rpc_queue 即 RPC 请求队列),同时监听RPC响应队列(图中 reply_to 即 RPC 响应队列)

  2. 服务端监听 RPC 请求队列的消息,收到消息后执行服务端的方法,得到方法返回的结果

  3. 服务端将 RPC 方法的结果发送到 RPC 响应队列

  4. 客户端(RPC 调用方)监听 RPC 响应队列,接收到 RPC 调用结果

Spring Boot 整合 RabbitMQ

用 Spring Boot 整合 RabbitMQ 实现 Topic 模式消息发送

搭建基本环境

添加起步依赖,即可自动添加 spring-rabbit 依赖

1
2
3
4
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring‐boot‐starter‐amqp</artifactId>
</dependency>

创建配置文件

在主配置文件 application.yml 中配置连接参数

1
2
3
4
5
6
7
8
9
10
11
server:
port: 44000
spring:
application:
name: test‐rabbitmq‐producer
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
virtualHost: /

定义一个 RabbitConfig 类,配置 Exchange,Queue,并绑定交换机

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
@Configuration
public class RabbitmqConfig {
public static final String QUEUE_INFORM_EMAIL = "queue_inform_email";
public static final String QUEUE_INFORM_SMS = "queue_inform_sms";
public static final String EXCHANGE_TOPICS_INFORM="exchange_topics_inform";
/**
* 交换机配置
* ExchangeBuilder提供了fanout、direct、topic、header交换机类型的配置
* @return the exchange
*/
@Bean(EXCHANGE_TOPICS_INFORM)
public Exchange EXCHANGE_TOPICS_INFORM() {
//durable(true)持久化,消息队列重启后交换机仍然存在
return ExchangeBuilder.topicExchange(EXCHANGE_TOPICS_INFORM).durable(true).build();
}
//声明队列
@Bean(QUEUE_INFORM_SMS)
public Queue QUEUE_INFORM_SMS() {
Queue queue = new Queue(QUEUE_INFORM_SMS);
return queue;
}
//声明队列
@Bean(QUEUE_INFORM_EMAIL)
public Queue QUEUE_INFORM_EMAIL() {
Queue queue = new Queue(QUEUE_INFORM_EMAIL);
return queue;
}
/** channel.queueBind(INFORM_QUEUE_SMS,"inform_exchange_topic","inform.#.sms.#");
* 绑定队列到交换机 .
*
* @param queue the queue
* @param exchange the exchange
* @return the binding
*/
@Bean
public Binding BINDING_QUEUE_INFORM_SMS(@Qualifier(QUEUE_INFORM_SMS) Queue queue,
@Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("inform.#.sms.#").noargs();
}
@Bean
public Binding BINDING_QUEUE_INFORM_EMAIL(@Qualifier(QUEUE_INFORM_EMAIL) Queue queue,
@Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("inform.#.email.#").noargs();
}
}

生产者

使用 spring-rabbit 提供的 RabbitTemplate 模板类中的方法 convertAndSend 进行消息的发送

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@SpringBootTest
@RunWith(SpringRunner.class)
public class Producer05_topics_springboot {
@Autowired
RabbitTemplate rabbitTemplate;
@Test
public void testSendByTopics(){
for (int i=0;i<5;i++){
String message = "sms email inform to user"+i;
rabbitTemplate.convertAndSend(RabbitmqConfig.EXCHANGE_TOPICS_INFORM,
"inform.sms.email",message);
System.out.println("Send Message is:'" + message + "'");
}
}
}

消费者

消费端工程加入相同起步依赖,用 spring-rabbit 提供的 @RabbitListener 注解监听队列,且可以监听多个队列。

1
2
3
4
5
6
7
8
9
10
11
12
13
@Component
public class ReceiveHandler {
//监听email队列
@RabbitListener(queues = {RabbitmqConfig.QUEUE_INFORM_EMAIL})
public void receive_email(String msg,Message message,Channel channel){
System.out.println("Receive message: " + msg);
}
//监听sms队列
@RabbitListener(queues = {RabbitmqConfig.QUEUE_INFORM_SMS})
public void receive_sms(String msg,Message message,Channel channel){
System.out.println("Receive message: " + msg);
}
}

启动生产者,发送消息,消费者工程会自动监听到消息并在控制台输出。






欢迎关注微信公众号,一起交流技术↓

{mywechat}

0%