`
357029540
  • 浏览: 725813 次
  • 性别: Icon_minigender_1
  • 来自: 重庆
社区版块
存档分类
最新评论
阅读更多

一. 什么是MQ

MQ 是message queue 消息队列的简称,也叫消息中间件,遵守JMS(java message service)规范的一种软件。它是一种跨进程的通信机制,用于上下游传递消息。在互联网架构中,MQ是一种非常常见的上下游“逻辑解耦+物理解耦”的消息通信服务。

二. MQ对比 在业界我们熟悉的MQ主要有RabbitMQ、ActiveMQ、Kafka、RocketMQ等主要消息中间件,我们下面对比下它们:

         
特性 ActiveMQ RabbitMQ RocketMQ Kafka
PRODUCER-CONSUMER 支持 支持 支持 支持
PUBLISH-SUBSCRIBE 支持 支持 支持 支持
REQUEST-REPLY 支持 支持 支持 支持
API完备性
多语言支持 支持,java优先 语言无关 支持Java, C++, Go 支持,java优先
单机吞吐量 万级 万级 万级 十万级
消息延迟   微秒级 毫秒级 毫秒级
可用性 高(主从) 高(主从) 非常高(分布式) 非常高(分布式)
消息丢失 理论上不会丢失 理论上不会丢失
消息重复 可能会有重复 可控制 可能会有重复 理论上会有重复
文档完备性
提供快速入门
首次部署难度
社区活跃度
商业支持 阿里云
成熟度 成熟 成熟 比较成熟 成熟日志领域
特点 功能齐全,被大量开源项目使用 由于Erlang语言的并发能力,性能好 各个环节分布式扩展设计,主从HA;支持上万个队列;多种消费模式;性能好 依赖zookeeper,性能好
支持协议 Openwire、STOMP、REST、XMPP、AMQP AMQP TCP、JMS自定义的协议 TCP
持久化 内存、文件、数据库 内存、文件 磁盘文件 文件
事务 支持 支持 支持 不支持
负载均衡 支持 支持 支持 支持,依赖zookeeper
管理界面 一般 有web console实现 终端命令方式
部署方式 独立、嵌入 独立 独立 独立,依赖zookeeper
评价

优点:成熟的产品,已经在很多非大规模场景应用,有较多文档,各种协议支持较好,有多种语言成熟的客户端。

缺点:根据用户的反馈,会出现莫名其妙的问题,且会丢失消息。其重心放到了activeMQ6.0产品-apollo上面,目前社区不活跃,且对5.x维护较少。activemq不适合上千个队列应用的场景。

优点:由于Erlang语音的特性,MQ性能较好;管理界面较丰富,在互联网公司也有较大规模应用;支持AMQP,有多种语言且支持AMQP客户端。

缺点:Erlang难道较大,集群不支持动态扩展

优点:模型简单,接口易用;集群规模大概在50台左右,单日处理消息上百亿;性能非常好,可以堆积大量消息在broker中;支持多种消费,包括集群消费、广播消费等。开发度较活跃,版本更新快。

缺点:产品较新文档比较缺乏,没有在MQ核心实现JMS等接口,对已有的系统而言不能兼容。

优点:产品成熟,主要用于日志领域,单机TPS(Transactions Per Second,每秒传输的事物处理个数)约在百万条/秒,消息大小10个字节

缺点:不支持事务,不支持消息回滚

         

在这里熟悉下MQ相关的协议:

1.AMQP协议

AMQP即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同开发语言等条件的限制。

优点:可靠、通用

2.MQTT协议

MQTT(Message Queuing Telemetry Transport,消息队列遥测传输)是IBM开发的一个即时通讯协议,有可能成为物联网的重要组成部分。该协议支持所有平台,几乎可以把所有联网物品和外部连接起来,被用来当做传感器和致动器(比如通过Twitter让房屋联网)的通信协议。

优点:格式简洁、占用带宽小、移动端通信、PUSH、嵌入式系统

3.STOMP协议

STOMP(Streaming Text Orientated Message Protocol)是流文本定向消息协议,是一种为MOM(Message Oriented Middleware,面向消息的中间件)设计的简单文本协议。STOMP提供一个可互操作的连接格式,允许客户端与任意STOMP消息代理(Broker)进行交互。

4.XMPP协议

XMPP(可扩展消息处理现场协议,Extensible Messaging and Presence Protocol)是基于可扩展标记语言(XML)的协议,多用于即时消息(IM)以及在线现场探测。适用于服务器之间的准即时操作。核心是基于XML流传输,这个协议可能最终允许因特网用户向因特网上的其他任何人发送即时消息,即使其操作系统和浏览器不同。

优点:通用公开、兼容性强、可扩展、安全性高,但XML编码格式占用带宽大

5.其他基于TCP/IP自定义的协议

有些特殊框架(如:redis、kafka、zeroMq等)根据自身需要未严格遵循MQ规范,而是基于TCP\IP自行封装了一套协议,通过网络socket接口进行传输,实现了MQ的功能。

优点:命令模式(非topic\queue模式)

三. Rabbit MQ使用

1.生产者和消费者的定义

生产者:消息发送者,在MQ中被称为生产者(producer),一个发送消息的应用也被叫做生产者,用P表示。

消费者:生产者“生产”出消息后,等待接受消息的应用程序,我们称之为消费者(Consuming ),用C表示。

2.exchange,queue,routekey定义及关系绑定

Queue:队列,在RabbitMQ的作用是存储消息,队列的特性是先进先出。生产者每次产生的数据都会通过Rabbit MQ进入到队列中,然后消费者通过订阅的方式消费队列中的数据。

那么生产者是如何通过Rabbit MQ把消息发送给队列的呢?它怎么知道要把哪一个生产者产生的消息发送给哪一个的队列呢?在这里我们首先介绍下Exchange交换机的相关概念。

Exchange:交换机,在Rabbit MQ中通过binding方法绑定queue和exchange的关系,exchange用作做消息分发,然后Exchange通过自己的规则类型exchangeType发送消息到对应的QUEUE上面,这样相应的QUEUE里面就有生产者发送来的消息了,接下来我们看看exchange的规则类型ExchangeType。

在介绍ExchangeType之前我们先了解下routekey的概念

routekey: 路由key,用于指定queue和exchange连接经过的路径。

ExchangeType有4种类型分别为fanout、direct、topic、headers。

1).fanout

fanout类型的Exchange路由规则非常简单,它会把所有发送到该Exchange的消息路由到所有与它绑定的Queue中。

 

 上图所示,生产者(P)生产消息1将消息1推送到Exchange,由于Exchange Type=fanout这时候会遵循fanout的规则将消息推送到所有与它绑定Queue,也就是图上的两个Queue最后两个消费者消费。这里可以不用指定routekey,因为所有QUEUE都会接收发送过来的消息。

2).direct

direct类型的Exchange路由规则也很简单,它会把消息路由到那些binding key与routing key完全匹配的Queue中

 当生产者(P)发送消息时Rotuing key=booking时,这时候将消息传送给Exchange,Exchange获取到生产者发送过来消息后,会根据自身的规则进行与匹配相应的Queue,这时发现Queue1和Queue2都符合,就会将消息传送给这两个队列,如果我们以Rotuing key=create和Rotuing key=confirm发送消息时,这时消息只会被推送到Queue2队列中,其他Routing Key的消息将会被丢弃。

3).topic

前面提到的direct规则是严格意义上的匹配,换言之Routing Key必须与Binding Key相匹配的时候才将消息传送给Queue,那么topic这个规则就是模糊匹配,可以通过通配符满足一部分规则就可以传送。它的约定是:

3.1). routing key为一个句点号“. ”分隔的字符串(我们将被句点号“. ”分隔开的每一段独立的字符串称为一个单词),如“stock.usd.nyse”、“nyse.vmw”、“quick.orange.rabbit”

3.2). binding key与routing key一样也是句点号“. ”分隔的字符串

3.3).binding key中可以存在两种特殊字符""与"#",用于做模糊匹配,其中“”用于匹配一个单词,“#”用于匹配多个单词(可以是零个)

 当生产者发送消息Routing Key=F.C.E的时候,这时候只满足Queue1,所以会被路由到Queue中,如果Routing Key=A.C.E这时候会被同是路由到Queue1和Queue2中,如果Routing Key=A.F.B时,这里只会发送一条消息到Queue2中。

4).headers

headers类型的Exchange不依赖于routing key与binding key的匹配规则来路由消息,而是根据发送的消息内容中的headers属性进行匹配。

在绑定Queue与Exchange时指定一组键值对;当消息发送到Exchange时,RabbitMQ会取到该消息的headers(也是一个键值对的形式),对比其中的键值对是否完全匹配Queue与Exchange绑定时指定的键值对;如果完全匹配则消息会路由到该Queue,否则不会路由到该Queue。

接下来我们看看代码上的定义,使用一个topic的代码定义,在这里我们主要使用spring boot组件化的rabbitmq进行代码演示和说明

1).定义一个名为topicTestQueue的队列

@Bean(name = "topicTestQueue")
public Queue topicTestQueue() {
    return new Queue("TOPIC_TEST_QUEUE", true);
}

 参数1是queue的名称,参数2是表示是否持久化存储

2).定义一个名为topicTestExchange的交换机

@Bean(name = "topicTestExchange")
public TopicExchange topicTestExchange() {
    return new TopicExchange("TOPIC_TEST_EXCHANGE");
}

 3).绑定推送队列到交换机并配置路由

@Bean(name = "bindingExchangeTest")
public Binding bindingExchangeTopicTest(@Qualifier("topicTestQueue") Queue queue,
                                @Qualifier("topicTestExchange") TopicExchange topicExchange) {
    // 绑定推送队列到交换机上,同时配置路由key为topic.test.route.key
    return BindingBuilder.bind(queue).to(topicExchange).with("topic.test.route.key");
}

 这个地方的BindingBuilder.to()方法根据传入的exchange不同实现了不同的初始化配置方法。

4).实现数据推送

rabbitTemplate.convertAndSend("TOPIC_TEST_EXCHANGE",
                "topic.test.route.key",
                pushData,
                correlationData)

 这里的rabbitTemplate是RabbitTemplate的注入类,从数据推送的方法定义和上面介绍的topic实现方式来看,我们通过指定的exchange和route路径给指定的queue发送MQ消息,第三个参数是需要推送的数据,第四个参数表示消息发送MQ失败时用来定义如何做后续处理的对象值,这个将在后面介绍,通过RabbitTemplate类我们可以去获取自己要用到的方法。

接下来介绍下exchangeType为Headers的实现方式,因为这个方式用的比较少,所以在这里实现下

1).定义一个queue

@Bean(name = "test_headers_queue")
public Queue headers_queue() {
    return new Queue("test_headers_queue");
}

 2).定义一个exchange

@Bean(name="test_headers_exchange")
public HeadersExchange exchange() {
    return new HeadersExchange("test_headers_exchange");
}

 3).绑定queue到exchange上并且定义匹配的方式

@Bean
Binding bindingExchangeTopicQueue(@Qualifier("test_headers_queue") Queue queue,
                                    @Qualifier("test_headers_exchange") HeadersExchange headersExchange) {
    Map<String, Object> map = new HashMap();
    map.put("TEST_1", "ONE");
    map.put("TEST_2", "TWO");

    //whereAll表示全部匹配
    //return BindingBuilder.bind(queue).to(headersExchange).whereAll(map).match();

    //whereAny表示部分匹配
    return BindingBuilder.bind(queue).to(headersExchange).whereAny(map).match();
}

 4).数据推送

public void send() {
    Map<String, Object> map_any = new HashMap();
    map_any.put("TEST_1", "ONE");

    // 声明消息 (消息体, 消息属性)
    MessageProperties messageProperties = new MessageProperties();
    // 设置消息是否持久化。Persistent表示持久化,Non-persistent表示不持久化
    messageProperties.setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);
    messageProperties.setContentType("UTF-8");
    messageProperties.getHeaders().putAll(map_any);

    Message message = new Message("testHeaderSend".getBytes(), messageProperties);
    rabbitTemplate.convertAndSend("test_headers_exchange",null, message);
}

 5).消费端接收

@RabbitListener(queues="test_headers_queue")
public void process(Message message) throws UnsupportedEncodingException {
    MessageProperties messageProperties = message.getMessageProperties();

    String contentType = messageProperties.getContentType();

    System.out.println("info:"+new String(message.getBody(), contentType));
}

 通过以上的方法介绍我们就实现了一个最基本的MQ消息的生产、存储和消费的流程定义,但是我们如何保证消息能够被MQ正确的接收?如果MQ宕机了消息还存在吗?消费失败了MQ消息还存在吗?接下来我们探讨下这个方面的应用。

3.消息推送给MQ,如何确定MQ接收到消息

在应用中,我们将消息推送给MQ的时候可以添加一个CorrelationData类型的参数到指定的方法中,这个CorrelationData对象是用于消息在MQ接收成功或失败后用于生产端对该数据做后续处理的。在这里我们必须配置如下的信息来开启生产端消息确认处理机制

spring:
    rabbitmq:
        publisher-confirms: true

 该配置信息表上是否启用消费确认,启用后会返回信息给生产端,同时在生产端必须实现RabbitTemplate.ConfirmCallback接口,该接口方法就是用于MQ消息返回的后的处理的,在这里我们用到了redis,将它用来缓存推送的消息,如果推送成功则删除该缓存信息,如果失败则将该消息用于数据推送,具体实现如下:

/**
     * 消息确认失败的处理
     *
     * @param correlationData
     * @param ack
     * @param cause
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        String id = correlationData.getId();
        // 如果消息投递失败,如何处理
        if (ack) {
            // 删除redis缓存
            redisTemplate.opsForHash().delete(("test-push-key", id);
        } else {

            // 重新投递
            Object pushData = redisTemplate.opsForHash().get("test-push-key", id);
            if (Optional.ofNullable(pushData).isPresent()) {
                rabbitTemplate.convertAndSend("TOPIC_TEST_EXCHANGE",
                        "topic.test.route.key",
                        pushData,
                        correlationData);
            }
        }
    }

 在这里还有一个RabbitTemplate.ReturnCallback接口,提供了returnedMessage方法,该方法是用作MQ接收成功或失败后的消息返回。同样这个需要配置启动后才能正常使用ReturnCallback接口

spring:
    rabbitmq:
        publisher-returns: true
        template:
            mandatory: true

 ConfirmCallback接口和ReturnCallback接口的使用场景调用区分如下:

如果消息没有到exchange,则confirm回调,ack=false

如果消息到达exchange,则confirm回调,ack=true

exchange到queue成功,则不回调return

exchange到queue失败,则回调return(需设置mandatory=true,否则不回回调,消息就丢了)

6.MQ接收到消息后如何保证消息不丢失

在MQ接收到消息后,它会首先写入到内存中,然后经过一定的时间从内存就写入到了文件上面,具体可以参考 https://blog.csdn.net/chengyifeng704823/article/details/79986156 这个上面的说明,在这里我们主要是说明的springboot的设置上面如何保证MQ消息的不丢失。

为了保证消息的不丢失我们需要从三个方面来进行配置

1).交换机exchange的持久化存储

在上面介绍的如何确认MQ已经接受到消息的说明中已经提到exchange到queue有可能会失败,在这里我们就需要把消息持久化来保存在queue恢复的时候来把消息转发到queue上面,不过好的是在springboot中已经默认实现了exchange的持久化,不需要我们显示的去定义,在AbstractExchange类中实现如下

public AbstractExchange(String name) {
    this(name, true, false);
}

/**
    * Construct a new Exchange, given a name, durability flag, auto-delete flag.
    * @param name the name of the exchange.
    * @param durable true if we are declaring a durable exchange (the exchange will
    * survive a server restart)
    * @param autoDelete true if the server should delete the exchange when it is no
    * longer in use
    */
public AbstractExchange(String name, boolean durable, boolean autoDelete) {
    this(name, durable, autoDelete, null);
}

 2).队列queue持久化存储

消息从exchange到queue后,如果MQ宕机后,如果没有持久化queue的话,在MQ重启后queue就可能全部丢失了,消息也无法从queue中进行消费,页次我们也需要将queue进行持久化处理,同样在springboot中也默认实现了queue的持久化,在Queue类中实现如下

public Queue(String name) {
    this(name, true, false, false);
}

/**
    * Construct a new queue, given a name, durability, exclusive and auto-delete flags.
    * @param name the name of the queue.
    * @param durable true if we are declaring a durable queue (the queue will survive a server restart)
    * @param exclusive true if we are declaring an exclusive queue (the queue will only be used by the declarer's
    * connection)
    * @param autoDelete true if the server should delete the queue when it is no longer in use
    */
public Queue(String name, boolean durable, boolean exclusive, boolean autoDelete) {
    this(name, durable, exclusive, autoDelete, null);
}

 3).消息message持久化存储

如果发送的消息到达queue后,没有做持久化的处理,宕机后消息也会丢失掉,所以也必须将message实现持久化。在springboot中默认是使用持久化存储,可以在MessageProperties类看到

public static final MessageDeliveryMode DEFAULT_DELIVERY_MODE = MessageDeliveryMode.PERSISTENT;

 通过以上三个持久化实现我们就可以在MQ里的实现,如果刚好在从内存往文件中写的时候宕机了也没有办法了,但是这种概率还是很小的虽然存在。

7.消费者消费MQ消息如何保证消息正确消费

消费者从MQ获取消息后不管是否消费成功springboot默认上是自动删除相应的获取消息,如果是一些重要的数据显然这并不是我们想要的结果,我们如何保证消息能被正确的消费呢?

1).我们需要在springboot配置文件中进行手动消费的配置

spring:
    rabbitmq:
        listener:
            simple:
                # 手动确认机制
                acknowledge-mode: AUTO

 2).自定义bean

@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory, MessageConverter messageConverter) {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    // 手动ack
    factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
    factory.setConnectionFactory(connectionFactory);
    factory.setMessageConverter(messageConverter);
    return factory;
}

@Bean
public MessageConverter messageConverter() {
    return new ContentTypeDelegatingMessageConverter(new Jackson2JsonMessageConverter());
}

 以上2个步骤是为了防止MQ自动删除消息。

3).消费端确认消费

@Service
@RabbitListener(queues = "topicTestQueue")
@Slf4j
public class DataPushServiceImpl implements DataPushService {

    @Override
    @RabbitHandler
    public void handleReceiveDataFromPush(PushData pushData, Channel channel, @Header(value = AmqpHeaders.DELIVERY_TAG) long tag) {
        try {
            dataPushRepository.save(pushData);

            // 向rabbitmq确认数据已经完成,不用重复发送
            channel.basicAck(tag, false);
        } catch (Exception e) {
            try {
                // 消费失败,重新发送消息
                channel.basicNack(tag, false, false);
            } catch (IOException e1) {
                log.error("消费失败,重新发送消息错误:{}", e);
            }
            log.error("消费失败错误:{}", e);
        }
    }
}

 通过以上三个步骤就完成了消费端正确的消费消息。

8.消息堆积的处理

我们这里假设的都是一台服务器,只有一个消费端,在什么情况下会出现消息堆积呢?

1).生产者产生的消息过快,消费端来不及处理

假设产生一条消息需要3秒,然而消费这条消息却需要1分钟,那么这个时候问题就来了,消费的速度显然跟不上生产的消息的速度,那么怎么办呢?

1.1).检查业务代码是不是有问题,优化业务代码

1.2).使用多线程来消费消息

我们可以通过配置来实现消费者的线程数量

spring:
    rabbitmq:
        listener:
            simple:
                #最小的线程数量
                concurrency: 10
                #最大的线程数量
                max-concurrency: 20

 1.3).添加一个阻塞队列来缓存消息

通过配置来实现

spring:
    rabbitmq:
        listener:
            simple:
                # 消费者端每一个线程消费消息最多200条
                prefetch: 200

 该配置是指每一个消费者可以缓存200条数据进行消费,每消费一次都会往里面添加1条数据。

prefetch的大小应该为多少

理想状况下,计算MQ服务端从缓冲区中拿到消息并推送到消费端,加上消费端处理完并ack消息到MQ服务端的时间,假设为100ms,其中消费端处理业务话费了10ms。

这里可以得出我们 prefetch = 100ms / 10ms = 10,也就是消息来回的总时间/业务处理的时间,这里要求我们 prefetch >= 10。一般计算这个时间不会太准确只能是大概数据的,所以prefetch一般要大一点。但是这个值也不能太大,不然消费端就一只处于空闲状态了。

所以如果你保证所有的消息都ack确认了,但是还是出现比较长时间的堵塞,你就或者加大一点prefetch,或者多加一些机器,或者减少业务处理的时间了。

在这里需要特别指明的是prefetch针对多消费者时,如果为1则为公平分发策略,即依次给每个消费者发送消息,相当于设置channel.setBasicQos(1),否则是轮询分发策略。

2).一条消息反复的消费导致MQ消息堆积

因为rabbit mq 是线性队列的原因,因此每次都是顺序执行的,当某一条消息没有被正确消费的时候,它就会阻塞后面消息的消费,就会导致消息的堆积。

出现这种情况的解决方案就是检查该消息为什么没有被正常的消费掉,如果消息本来就是不在需要的可以考虑将其丢弃掉。同时我们也可以考虑定义queue的时候设置消息的过期时间,如下:

Map<String, Object> args = new HashMap<String, Object>();
// 单位是毫秒
args.put("x-message-ttl", 4000);

 这里还有一些其他参数如下:

Message TTL(x-message-ttl):设置队列中的所有消息的生存周期(统一为整个队列的所有消息设置生命周期), 也可以在发布消息的时候单独为某个消息指定剩余生存时间,单位毫秒, 类似于redis中的ttl,生存时间到了,消息会被从队里中删除,注意是消息被删除,而不是队列被删除, 特性Features=TTL, 单独为某条消息设置过期时间

AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties().builder().expiration(“6000”);
channel.basicPublish(EXCHANGE_NAME,"", properties.build(), message.getBytes(“UTF-8”));

 Auto Expire(x-expires): 当队列在指定的时间没有被访问(consume, basicGet, queueDeclare…)就会被删除,Features=Exp

Max Length(x-max-length): 限定队列的消息的最大值长度,超过指定长度将会把最早的几条删除掉, 类似于mongodb中的固定集合,例如保存最新的100条消息, Feature=Lim

Max Length Bytes(x-max-length-bytes): 限定队列最大占用的空间大小, 一般受限于内存、磁盘的大小, Features=Lim B

Dead letter exchange(x-dead-letter-exchange): 当队列消息长度大于最大长度、或者过期的等,将从队列中删除的消息推送到指定的交换机中去而不是丢弃掉,Features=DLX

Dead letter routing key(x-dead-letter-routing-key):将删除的消息推送到指定交换机的指定路由键的队列中去, Feature=DLK

Maximum priority(x-max-priority):优先级队列,声明队列时先定义最大优先级值(定义最大值一般不要太大),在发布消息的时候指定该消息的优先级, 优先级更高(数值更大的)的消息先被消费,

Lazy mode(x-queue-mode=lazy): Lazy Queues: 先将消息保存到磁盘上,不放在内存中,当消费者开始消费的时候才加载到内存中

Master locator(x-queue-master-locator):主队列选择策略

选择存在主队列 最少的的节点: min-masters

选择client 声明queue 连接的节点:client-local

随机选择:random

具体一些说明可以参考 https://blog.csdn.net/vbirdbest/article/details/78670550 ,https://www.cnblogs.com/wanglm/p/6092865.html

3).没有手动确认消息已经被消费

MQ消息的确认机制是手动确认的,但是忘记在消费端代码中实现消息确认机制了,亦或者因为一些异常原因,比如幂等性业务需求没有完成消息的确认,从而导致消息的堆积。

  • 大小: 26 KB
  • 大小: 30.7 KB
  • 大小: 22.4 KB
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics