首先实现生产者发送消息和队列的持久化,这部分摘抄自http://blog.720ui.com/2017/rabbitmq_action_durable/
要从奔溃的 RabbitMQ 中恢复的消息,我们需要做消息持久化。如果消息要从 RabbitMQ 奔溃中恢复,那么必须满足三点,且三者缺一不可。
- 交换器必须是持久化。
- 队列必须是持久化的。
- 消息必须是持久化的。
原生的实现方式
原生的 RabbitMQ 客户端需要完成三个步骤。
第一步,交换器的持久化。
// 参数1 exchange :交换器名 // 参数2 type :交换器类型 // 参数3 durable :是否持久化 channel.exchangeDeclare(EXCHANGE_NAME, "topic", true);
第二步,队列的持久化。
// 参数1 queue :队列名 // 参数2 durable :是否持久化 // 参数3 exclusive :仅创建者可以使用的私有队列,断开后自动删除 // 参数4 autoDelete : 当所有消费客户端连接断开后,是否自动删除队列 // 参数5 arguments channel.queueDeclare(QUEUE_NAME, true, false, false, null);
第三步,消息的持久化。
// 参数1 exchange :交换器 // 参数2 routingKey : 路由键 // 参数3 props : 消息的其他参数,其中 MessageProperties.PERSISTENT_TEXT_PLAIN 表示持久化 // 参数4 body : 消息体 channel.basicPublish("", queue_name, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
Spring AMQP 的实现方式
Spring AMQP 是对原生的 RabbitMQ 客户端的封装。一般情况下,我们只需要定义交换器的持久化和队列的持久化。
其中,交换器的持久化配置如下。
// 参数1 name :交互器名 // 参数2 durable :是否持久化 // 参数3 autoDelete :当所有消费客户端连接断开后,是否自动删除队列 new TopicExchange(name, durable, autoDelete)
此外,还需要再配置队列的持久化。
// 参数1 name :队列名 // 参数2 durable :是否持久化 // 参数3 exclusive :仅创建者可以使用的私有队列,断开后自动删除 // 参数4 autoDelete : 当所有消费客户端连接断开后,是否自动删除队列 new Queue(name, durable, exclusive, autoDelete);
至此,RabbitMQ 的消息持久化配置完毕。
那么,消息的持久化难道不需要配置么?确实如此,我们来看下源码。
一般情况下,我们会通过这种方式发送消息。
rabbitTemplate.convertAndSend(exchange, routeKey, message);
其中,调用了 convertAndSend(String exchange, String routingKey, final Object object) 方法。
@Override public void convertAndSend(String exchange, String routingKey, final Object object) throws AmqpException { convertAndSend(exchange, routingKey, object, (CorrelationData) null); }
接着,用调用了 convertAndSend(String exchange, String routingKey, final Object object, CorrelationData correlationData) 方法。
public void convertAndSend(String exchange, String routingKey, final Object object, CorrelationData correlationData) throws AmqpException { send(exchange, routingKey, convertMessageIfNecessary(object), correlationData); }
此时,最关键的方法出现了,它是 convertMessageIfNecessary(final Object object)。
protected Message convertMessageIfNecessary(final Object object) { if (object instanceof Message) { return (Message) object; } return getRequiredMessageConverter().toMessage(object, new MessageProperties()); }
其中,关键的是 MessageProperties 类,它持久化的策略是 MessageDeliveryMode.PERSISTENT,因此它会初始化时默认消息是持久化的。
public class MessageProperties implements Serializable { public MessageProperties() { this.deliveryMode = DEFAULT_DELIVERY_MODE; this.priority = DEFAULT_PRIORITY; } static { DEFAULT_DELIVERY_MODE = MessageDeliveryMode.PERSISTENT; DEFAULT_PRIORITY = Integer.valueOf(0); } }
在消费者端,我这里采用的是默认的exchange,所以有很多配置没有使用,可以参考其他的
import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.support.AmqpHeaders; import org.springframework.messaging.handler.annotation.Header; import org.springframework.messaging.simp.SimpMessagingTemplate; import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.io.IOException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; @Component @RabbitListener(queues = "rotork_websocketQueue") @Slf4j public class WebsocketConsumer { @Resource private SimpMessagingTemplate simpMessagingTemplate; @Resource private Executor poolTaskExecutor; @RabbitHandler public void send(String msg, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag){ CompletableFuture.runAsync(() ->{ try { simpMessagingTemplate.convertAndSend("/topic/msg", msg); //不需要重新发送消息 channel.basicAck(tag,false); }catch (Exception e){ try { // 消费失败,重新发送消息 channel.basicNack(tag, false, true); } catch (IOException ioe) { log.error("websocket消费重新获取消费消息错误:{}", ioe); } throw new RuntimeException(e); } }, poolTaskExecutor).exceptionally(e -> { log.error("websocket消费消息失败", e); throw new RuntimeException(e); }); } }
相关推荐
基于springBoot2.0,整合rabbit 和 hikari的示例代码
基于SpringBoot+RabbitMQ用户注册实现异步发送验证码源码。基于SpringBoot+RabbitMQ用户注册实现异步发送验证码源码。基于SpringBoot+RabbitMQ用户注册实现异步发送验证码源码。...基于SpringBoot+Rabbit
springboot Rabbit死信队列实现,rocketMq重试消息实现 基于springboot2.15版本,最新rabbit和rocktMq 中间件实例,亲测可用
RabbitMQ实战-多线程-springboot-rabbit
springboot集成rabbit mq
springboot整合stream使用rabbitmq作为消息中间件
rabbit mq入门例子. 使用rabbit mq实现服务器连接,消息发送,接收。
此外,文章还将讨论RabbitMQ的一些高级特性,如路由、主题交换和持久化设置,以及如何结合Spring Boot进行应用。最后,我们将总结使用Spring Boot与RabbitMQ集成的好处,并给出一些最佳实践建议,以帮助读者在实际...
连接rabbit mq的简单demo,演示如果连接mq并发送消息接收消息
springboot集成rabbitmq的简单使用,介绍了springboot集成rabbitmq的使用,利用的交换机、队列、路由key来实现的例子
RabbitMQ连接池+SpringBoot实现。通过连接池实现将高效的管理RabbitMQ的Connection,并与springboot进行整合,实现消息发送,获取队列列表等功能。基于此可以进行更多功能的扩充。
spring boot 集成rabbit mq 成功demo,spring boot 集成rabbit mq 成功demo
springBoot整合rabbitMq案例演示,需要导入pom文件中的一些jae包,需要自己搭建RabbitMQ环境、根据自己的环境进行修改配置文件就可以使用。
更新rabbit mq的demo,演示连接mq并发送消息接收消息 持久化模式、客户端订阅方式、单链接
springboot redis zookeeperlock rabbit实现的分布式锁 代码
rabbit mq windows 64 较新,很实用,消息中间件服务端。
rabbit mq 安装手册,详细介绍了rabbit mq在linux环境下如何安装
docker 安裝 rabbit mq 並測試 http://knight-black-bob.iteye.com/blog/2395713
springboot redis zookeeperlock rabbit实现的分布式锁.zip
本附件包含Rabbit MQ的安装包以及作者亲自撰写的安装步骤,供大家参考。