简介
用户在使用SDK的时候, 可能会觉得消息处理慢, 想多线程处理, 结果使用多线程用同一个consumerId 拉取消息,
多线程处理流程介绍
首先要明确一点, 多线程处理消息,并不是多线程拉取消息, 消息只能是单线程拉取. 流程如下:
运行demo
对demo进行简单的配置,运行即可. 详细的运行步骤见: SDK使用文档
多线程处理参数配置介绍
代码如下:
consumerMessage.setConsumeIntervalTime(200).setIsMultiThread(true)
.setMessageErrorStrategy(new DefaultHandleMessageErrorStrategy()).setPoolSize(5).setBatchSize(101);
在这段代码中共设置了5个参数,这也是多线程处理消息所需要的所有参数.
参数 | 含义 | 默认值 |
---|---|---|
consumeIntervalTime | 两次拉取消息的时间间隔 | 200ms |
isMultiThread | 是否开启多线程处理消息,true -开启; false-关闭 | false |
messgeStartegy | 用户处理消息失败时,执行的策略 | LocalQueueMessageErrorStrategy:把失败的消息放入本地队列m |
poolSize | 线程池大小 | 4 |
batchSize | 对消息进行分组放入线程池处理, batchSize就是一批消息的大小.比如收到200条消息, 一批100条, 那么消息会分成两批, 放入线程池处理. | 100 |
代码简单解读
init方法代码
public static void init() {
// 开放平台的url,这是test环境的url https://test.ys7.com:65
String path = "https://test12open.ys7.com";
//
// 设置你的appkey,appSecret,group
StandardConsumerMessage consumerMessage = new StandardConsumerMessage(path, appKey, appSecret, group);
// 处理消息失败时的策略
LocalQueueMessageErrorStrategy localQueueMessageStrategy = new LocalQueueMessageErrorStrategy(
new ConsumerCallBack() {
@Override
public void consumerCall(List<Object> msg) {
log.info("处理失败的消息,个数:{},msg:{}.", msg.size(), msg);
}
});
// 设置调用接口的间隔时间,ms
consumerMessage.setConsumeIntervalTime(200).setIsMultiThread(true)
.setMessageErrorStrategy(localQueueMessageStrategy).setPoolSize(5).setBatchSize(101);
// 设置消费消息的回调接口,建议把消息放入本地队列,不要在回调函o数里做业务处理.
consumerMessage.setConsumerCallBack(new ConsumerCallBack() {
@Override
public void consumerCall(List<Object> msg) {
if (msg.size() > 100) {
// 消息太多,只打印前100个
log.info("去前100,msgSize:{}.", msg.size());
} else {
log.info("小于等于100个msgSize:{},", msg.size());
}
for (int i = 0; i < msg.size() && i < 3; i++) {
// class com.alibaba.fastjson.JSONObject
log.info("my readMessageHandle object:{},class:{}", msg.get(i), msg.get(i).getClass());
}
}
});
consumerMessage.initClient();
}
在上边的代码中, 一个 StandardConsumerMessage 代表一个消费者, 去读取消息.
StandardConsumerMessage 是异步去读取消息的, 收到消息时回调 setConsumerCallBack 方法, 这里需要注意: 这里setConsumerCallBack 方法是多线程执行的,用户需要自己控制线程安全.
LocalQueueMessageErrorStrategy localQueueMessageStrategy 是默认的消息处理策略, 采用本地队列进行存储,异步处理这些错误的消息.
消息处理失败的处理策略
由于是多线程处理消息, 消息是分批处理的,那么就有可能有的消息处理成功,有的处理失败, 那么当用户处理消息失败时, 这些消息如何处理?
在SDK中对外提供了接口:
//用户处理消息失败时的处理策略接口
public interface HandleMessageErrorStrategy {
public void handleMessageError(List<Object> data);
}
用户可以自己实现这个接口, 实现自己对错误消息的处理策略, 在SDK中提供了两种策略:
策略 | 适用场景 | 实现原理 |
---|---|---|
重复消费策略 | 当拉取到的消息都处理出错时,用户可以进行重复消费.比如: 一个用户拉取消息执行业务,然后存到数据库中, 当数据库宕机时,所有的消息处理失败,那么就适合重复消费 | 线程sleep30s,超时重读 |
本地队列存放处理失败的消息 | 当拉取到的消息,大部分处理成功 ,小部分处理失败时. | 放入本地队列, 队列大小默认2000 |
建议用户根据这两个处理策略, 实现适合自己的错误消息的处理策略. 下边以LocalQueueMessageErrorStrategy 为例, 说明下消息处理失败时的策略实现:
public class LocalQueueMessageErrorStrategy implements HandleMessageErrorStrategy {
private static Log log = LogFactory.getLog(LocalQueueMessageErrorStrategy.class);
//本地队列使用BlockingQueue
private BlockingQueue<List<Object>> queue = new LinkedBlockingQueue<>(2000);
public LocalQueueMessageErrorStrategy(ConsumerCallBack consumerCallBack) {
this.consumerCallBack = consumerCallBack;
}
@Override
public void handleMessageError(List<Object> data) {
try {//失败的消息放入队列
queue.put(data);
} catch (InterruptedException e) {
log.error(String.format("handleMessageError出错,data:%s.", data), e);
}
}
protected void consumerMessage(List<Object> data) {
try {
while (true) {//不断的取出处理失败的消息
List<Object> take = queue.take();
consumerCall(take);
}
} catch (Exception e) {
log.error(String.format("handleMessageError出错,data:%s.", data), e);
}
}
//需要用户根据自己的业务实现的回调接口
ConsumerCallBack consumerCallBack;
public void setConsumerCallBack(ConsumerCallBack consumerCallBack) {
this.consumerCallBack = consumerCallBack;
}
public void consumerCall(List<Object> msg) {
if (consumerCallBack == null) {
throw new YsmqConsumerMessageException("回调函数不可为空,请设置回调函数!");
}
this.consumerCallBack.consumerCall(msg);
}
}
PS:代码中有注释