简介

用户在使用SDK的时候, 可能会觉得消息处理慢, 想多线程处理, 结果使用多线程用同一个consumerId 拉取消息,

多线程处理流程介绍

首先要明确一点, 多线程处理消息,并不是多线程拉取消息, 消息只能是单线程拉取. 流程如下: image

运行demo

对demo进行简单的配置,运行即可. 详细的运行步骤见: SDK使用文档

多线程处理参数配置介绍

代码如下:

consumerMessage.setConsumeIntervalTime(200).setIsMultiThread(true)
.setMessageErrorStrategy(new DefaultHandleMessageErrorStrategy()).setPoolSize(5).setBatchSize(101);

在这段代码中共设置了5个参数,这也是多线程处理消息所需要的所有参数.

参数 含义 默认值
consumeIntervalTime 两次拉取消息的时间间隔 200ms
isMultiThread 是否开启多线程处理消息,true -开启; false-关闭 false
messgeStartegy 用户处理消息失败时,执行的策略 LocalQueueMessageErrorStrategy:把失败的消息放入本地队列m
poolSize 线程池大小 4
batchSize 对消息进行分组放入线程池处理, batchSize就是一批消息的大小.比如收到200条消息, 一批100条, 那么消息会分成两批, 放入线程池处理. 100

代码简单解读

init方法代码

    public static void init() {

    // 开放平台的url,这是test环境的url https://test.ys7.com:65
     String path = "https://test12open.ys7.com";
     //
     // 设置你的appkey,appSecret,group
     StandardConsumerMessage consumerMessage = new StandardConsumerMessage(path, appKey, appSecret, group);
     // 处理消息失败时的策略
     LocalQueueMessageErrorStrategy localQueueMessageStrategy = new LocalQueueMessageErrorStrategy(
             new ConsumerCallBack() {
                 @Override
                 public void consumerCall(List<Object> msg) {
                     log.info("处理失败的消息,个数:{},msg:{}.", msg.size(), msg);
                 }
             });
     // 设置调用接口的间隔时间,ms
     consumerMessage.setConsumeIntervalTime(200).setIsMultiThread(true)
             .setMessageErrorStrategy(localQueueMessageStrategy).setPoolSize(5).setBatchSize(101);
     // 设置消费消息的回调接口,建议把消息放入本地队列,不要在回调函o数里做业务处理.
     consumerMessage.setConsumerCallBack(new ConsumerCallBack() {
         @Override
         public void consumerCall(List<Object> msg) {
             if (msg.size() > 100) {
                 // 消息太多,只打印前100个
                 log.info("去前100,msgSize:{}.", msg.size());
             } else {
                 log.info("小于等于100个msgSize:{},", msg.size());
             }
             for (int i = 0; i < msg.size() && i < 3; i++) {
                 // class com.alibaba.fastjson.JSONObject
                 log.info("my readMessageHandle object:{},class:{}", msg.get(i), msg.get(i).getClass());
             }
         }
     });
     consumerMessage.initClient();
    }

在上边的代码中, 一个 StandardConsumerMessage 代表一个消费者, 去读取消息.

StandardConsumerMessage 是异步去读取消息的, 收到消息时回调 setConsumerCallBack 方法, 这里需要注意: 这里setConsumerCallBack 方法是多线程执行的,用户需要自己控制线程安全.

LocalQueueMessageErrorStrategy localQueueMessageStrategy 是默认的消息处理策略, 采用本地队列进行存储,异步处理这些错误的消息.

消息处理失败的处理策略

由于是多线程处理消息, 消息是分批处理的,那么就有可能有的消息处理成功,有的处理失败, 那么当用户处理消息失败时, 这些消息如何处理?

在SDK中对外提供了接口:

    //用户处理消息失败时的处理策略接口
    public interface HandleMessageErrorStrategy {
      public void handleMessageError(List<Object> data);
    }

用户可以自己实现这个接口, 实现自己对错误消息的处理策略, 在SDK中提供了两种策略:

策略 适用场景 实现原理
重复消费策略 当拉取到的消息都处理出错时,用户可以进行重复消费.比如: 一个用户拉取消息执行业务,然后存到数据库中, 当数据库宕机时,所有的消息处理失败,那么就适合重复消费 线程sleep30s,超时重读
本地队列存放处理失败的消息 当拉取到的消息,大部分处理成功 ,小部分处理失败时. 放入本地队列, 队列大小默认2000

建议用户根据这两个处理策略, 实现适合自己的错误消息的处理策略. 下边以LocalQueueMessageErrorStrategy 为例, 说明下消息处理失败时的策略实现:

    public class LocalQueueMessageErrorStrategy implements HandleMessageErrorStrategy {
    private static Log log = LogFactory.getLog(LocalQueueMessageErrorStrategy.class);
    //本地队列使用BlockingQueue
    private BlockingQueue<List<Object>> queue = new LinkedBlockingQueue<>(2000);
    public LocalQueueMessageErrorStrategy(ConsumerCallBack consumerCallBack) {
        this.consumerCallBack = consumerCallBack;
    }
    @Override
    public void handleMessageError(List<Object> data) {
        try {//失败的消息放入队列
            queue.put(data);
        } catch (InterruptedException e) {
            log.error(String.format("handleMessageError出错,data:%s.", data), e);
        }
    }
    protected void consumerMessage(List<Object> data) {
        try {
            while (true) {//不断的取出处理失败的消息
                List<Object> take = queue.take();
                consumerCall(take);
            }
        } catch (Exception e) {
            log.error(String.format("handleMessageError出错,data:%s.", data), e);
        }
    }
    //需要用户根据自己的业务实现的回调接口
    ConsumerCallBack consumerCallBack;
    public void setConsumerCallBack(ConsumerCallBack consumerCallBack) {
        this.consumerCallBack = consumerCallBack;
    }
    public void consumerCall(List<Object> msg) {
        if (consumerCallBack == null) {
            throw new YsmqConsumerMessageException("回调函数不可为空,请设置回调函数!");
        }
        this.consumerCallBack.consumerCall(msg);
    }
    }

PS:代码中有注释

results matching ""

    No results matching ""