ActiveMQ笔记(7):如何清理无效的延时消息?
时间:2022-04-22
本文章向大家介绍ActiveMQ笔记(7):如何清理无效的延时消息?,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。
ActiveMQ的延时消息是一个让人又爱又恨的功能,具体使用可参考上篇ActiveMQ笔记(6):消息延时投递,在很多需要消息延时投递的业务场景十分有用,但是也有一个缺陷,在一些大访问量的场景,如果瞬间向MQ发送海量的延时消息,超过MQ的调度能力,就会造成很多消息到了该投递的时刻,却没有投递出去,形成积压,一直停留在ActiveMQ web控制台的Scheduled面板中。
下面的代码演示了,如何清理activemq中的延时消息(包括:全部清空及清空指定时间段的延时消息),这也是目前唯一可行的办法。
为了演示方便,先封装一个小工具类:
package cn.mwee.utils.mq;
import cn.mwee.utils.list.ListUtil;
import cn.mwee.utils.log4j2.MwLogger;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessagePostProcessor;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.TextMessage;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
/**
* Created by yangjunming on 6/20/16.
*/
public final class MessageUtil {
private Logger logger = new MwLogger(MessageUtil.class);//这里就是一个Log4j2的实例,大家可以换成原生的log4j2或类似工具
private ConnectionFactory connectionFactory;
private long receiveTimeout;//接收超时时间
private JmsTemplate jmsTemplate;
private List<String> destinationQueueNames;
private final static String BACKUP_QUEUE_SUFFIX = "_B";
private boolean autoBackup = false;//是否自动将消息备份到_b的队列,方便调试
public MessageUtil(final ConnectionFactory connectionFactory, final long receiveTimeout, final List<String> destinationQueueNames) {
this.connectionFactory = connectionFactory;
this.receiveTimeout = receiveTimeout;
this.destinationQueueNames = new ArrayList<>();
this.destinationQueueNames.addAll(destinationQueueNames.stream().collect(Collectors.toList()));
jmsTemplate = new JmsTemplate(this.connectionFactory);
jmsTemplate.setReceiveTimeout(this.receiveTimeout);
}
public MessageUtil(ConnectionFactory connectionFactory, List<String> destinationQueueNames) {
this(connectionFactory, 10000, destinationQueueNames);
}
public void convertAndSend(Object message) {
if (ListUtil.isEmpty(destinationQueueNames)) {
logger.error("目标队列为空,无法发送,请检查配置!message => " + message.toString());
return;
}
for (String dest : destinationQueueNames) {
jmsTemplate.convertAndSend(dest, message);
if (autoBackup) {
jmsTemplate.convertAndSend(dest + BACKUP_QUEUE_SUFFIX, message);
}
}
}
public void convertAndSend(Object message, MessagePostProcessor messagePostProcessor) {
if (ListUtil.isEmpty(destinationQueueNames)) {
logger.error("目标队列为空,无法发送,请检查配置!message => " + message.toString());
return;
}
for (String dest : destinationQueueNames) {
jmsTemplate.convertAndSend(dest, message, messagePostProcessor);
if (autoBackup) {
jmsTemplate.convertAndSend(dest + BACKUP_QUEUE_SUFFIX, message, messagePostProcessor);
}
}
}
public void convertAndSend(String destinationName, Object message) {
if (StringUtils.isBlank(destinationName)) {
logger.error("目标队列为空,无法发送,请检查配置!message => " + message.toString());
return;
}
jmsTemplate.convertAndSend(destinationName, message);
if (autoBackup) {
jmsTemplate.convertAndSend(destinationName + BACKUP_QUEUE_SUFFIX, message);
}
}
public void convertAndSend(String destinationName, Object message, MessagePostProcessor messagePostProcessor) {
if (StringUtils.isBlank(destinationName)) {
logger.error("目标队列为空,无法发送,请检查配置!message => " + message.toString());
return;
}
jmsTemplate.convertAndSend(destinationName, message, messagePostProcessor);
if (autoBackup) {
jmsTemplate.convertAndSend(destinationName + BACKUP_QUEUE_SUFFIX, message, messagePostProcessor);
}
}
public static String getText(javax.jms.Message message) {
if (message instanceof TextMessage) {
try {
return ((TextMessage) message).getText();
} catch (JMSException e) {
return message.toString();
}
}
return message.toString();
}
public String getFirstDestination() {
if (ListUtil.isEmpty(destinationQueueNames)) {
return null;
}
return destinationQueueNames.get(0);
}
public boolean isAutoBackup() {
return autoBackup;
}
public void setAutoBackup(boolean autoBackup) {
this.autoBackup = autoBackup;
}
}
其中主要就用到了convertAndSend(Object message, MessagePostProcessor messagePostProcessor) 这个方法,其它代码可以无视。
先来模拟瞬间向MQ发送大量延时消息:
/**
* 发送延时消息
*
* @param messageUtil
*/
private static void sendScheduleMessage(MessageUtil messageUtil) {
for (int i = 0; i < 10000; i++) {
Object obj = "test:" + i;
messageUtil.convertAndSend(obj, new ScheduleMessagePostProcessor(1000 + i * 1000));
}
}
这里向MQ发送了1w条延时消息,每条消息延时1秒*i,上面代码中的ScheduleMessagePostProcessor类可在上篇中找到。
运行完之后,MQ中应该堆积着了很多消息了:
下面的代码可以清空所有延时消息:
/**
* 删除所有延时消息
*
* @param connectionFactory
* @throws JMSException
*/
private static void deleteAllScheduleMessage(final ConnectionFactory connectionFactory) throws JMSException {
Connection conn = connectionFactory.createConnection();
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination management = session.createTopic(ScheduledMessage.AMQ_SCHEDULER_MANAGEMENT_DESTINATION);
MessageProducer producer = session.createProducer(management);
Message request = session.createMessage();
request.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION, ScheduledMessage.AMQ_SCHEDULER_ACTION_REMOVEALL);
producer.send(request);
}
清空所有延时消息,有些用力过猛了,很多时候,我们只需要清理掉过期的延时消息(即:本来计划是8:00投递出去的消息,结果过了8点还没投递出去)
/**
* 删除过期的延时消息
*
* @param connectionFactory
* @throws JMSException
*/
private static void deleteExpiredScheduleMessage(final ConnectionFactory connectionFactory) throws JMSException {
long start = System.currentTimeMillis() - TimeUnit.HOURS.toMillis(12);//删除:当前时间前12小时范围的延时消息
long end = System.currentTimeMillis();
Connection conn = connectionFactory.createConnection();
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination management = session.createTopic(ScheduledMessage.AMQ_SCHEDULER_MANAGEMENT_DESTINATION);
MessageProducer producer = session.createProducer(management);
Message request = session.createMessage();
request.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION, ScheduledMessage.AMQ_SCHEDULER_ACTION_REMOVEALL);
request.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION_START_TIME, Long.toString(start));
request.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION_END_TIME, Long.toString(end));
producer.send(request);
}
与上一段代码基本相似,只是多指定了删除消息的起止时间段。
最后贴一段spring的配置文件及main函数入口
1 <?xml version="1.0" encoding="UTF-8"?>
2 <beans xmlns="http://www.springframework.org/schema/beans"
3 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4 xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
5
6 <bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">
7 <property name="connectionFactory">
8 <bean class="org.apache.activemq.ActiveMQConnectionFactory">
9 <property name="brokerURL"
10 value="failover:(tcp://localhost:61616,tcp://localhost:61626)?randomize=false&backup=true"/>
11 <property name="maxThreadPoolSize" value="100"/>
12 </bean>
13 </property>
14 </bean>
15
16 <bean id="messageUtil" class="cn.mwee.utils.mq.MessageUtil">
17 <constructor-arg index="0" ref="jmsFactory"/>
18 <constructor-arg index="1" value="10000"/>
19 <constructor-arg index="2">
20 <list>
21 <value>dest1</value>
22 <value>dest2</value>
23 </list>
24 </constructor-arg>
25 <property name="autoBackup" value="true"/>
26 </bean>
27
28 </beans>
main函数:
public static void main(String[] args) throws InterruptedException, JMSException {
ApplicationContext context = new ClassPathXmlApplicationContext("spring-sender.xml");
ConnectionFactory connectionFactory = context.getBean(ConnectionFactory.class, "jmsFactory");
MessageUtil messageUtil = context.getBean(MessageUtil.class);
// sendScheduleMessage(messageUtil);
// deleteAllScheduleMessage(connectionFactory);
deleteExpiredScheduleMessage(connectionFactory);
}
参考文章:
Enhanced JMS Scheduler in ActiveMQ
- HDU 2037 今年暑假不AC(贪心,区间更新,板子题)
- “玲珑杯”ACM比赛 Round #13 题解&源码
- 回溯算法入门及经典案例剖析(初学者必备宝典)
- Selenium2+python自动化66-装饰器之运行失败截图
- 51Nod 1091 线段的重叠(贪心+区间相关,板子题)
- 51Nod 1016 水仙花数 V2(组合数学,枚举打表法)
- Selenium2+python自动化67-用例失败自动截图
- Codeforces Round #404 (Div. 2)(A.水,暴力,B,排序,贪心)
- hihoCoder #1053 : 居民迁移(贪心,二分搜索,google在线技术笔试模拟)
- php开发文章发布示例(正则表达式实例开发)
- Codeforces Round #408 (Div. 2)(A.水,B,模拟)
- php实现文件上传
- Selenium2+python自动化69-PhantomJS使用
- Selenium2+python自动化70-unittest之跳过用例(skip)
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- 3分钟短文:说说Laravel模型中还算常用的2个“关系”
- iOS音视频接入 - TRTC实时屏幕分享
- 如何维护爬虫代理
- LoRaWAN 帧计数机制及典型问题分析
- ffmpeg mp4解码管道输出的问题
- 机器人运动控制仿真:Matlab机器人工具箱和Simmechanics
- 使用HTMLTestRunner实现HTML测试报告
- Jmeter五步实现性能测试
- 测试工程师必须要掌握的linux命令
- Python之pip使用详解|附第三方库安装总结
- Python基础之数据类型详解
- 编码效率提升之Pycharm活动模板(Live Templates )
- crictl调试Kubernetes节点
- leetcode哈希表之好数对的数目
- Python处理excel的强大工具-openpyxl