RocketMQ顺序消息发送和消费实例
1.消息顺序发送,具体说明在注释中,参考官网
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
/**
* @description 消息顺序发送示例
*/
public class SortSendMsg {
public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
DefaultMQProducer producer = new DefaultMQProducer("order_group");
producer.setNamesrvAddr("localhost:8080");
producer.start();
String[] tags = {"TagA", "TagB", "TagC"};
//创建订单列表
List orders = new SortSendMsg().buildOrders();
Date date = new Date();
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String dateStr = sdf.format(date);
for (int i = 0; i < 10; i++) {
//为消息体加个时间戳
String body = dateStr+"hello RocketMQ"+orders.get(i);
Message message = new Message("TopicTest", tags[i % tags.length], "KEY" + i, body.getBytes());
producer.send(message, new MessageQueueSelector() {
@Override
public MessageQueue select(List list, Message message, Object o) {
//根据订单id选择对应的queue
Long id = (Long)o;//参数o 就是send方法第三个参数传入的值,我们传入了orderid,所以代表订单编号
long index = id % list.size();//根据订单编号进行取模,相同编号的模值是一样的
return list.get((int)index);//获取对应的队列并返回进行发送消息
}
},orders.get(i).getOrderId());
}
//消息发送完了,关闭producer
producer.shutdown();
}
/**
* 订单的步骤
*/
private static class OrderStep {
private long orderId;
private String desc;
public long getOrderId() {
return orderId;
}
public void setOrderId(long orderId) {
this.orderId = orderId;
}
public String getDesc() {
return desc;
}
public void setDesc(String desc) {
this.desc = desc;
}
@Override
public String toString() {
return "OrderStep{" +
"orderId=" + orderId +
", desc='" + desc + '\'' +
'}';
}
}
private List buildOrders() {
List orderList = new ArrayList();
OrderStep orderDemo = new OrderStep();
orderDemo.setOrderId(15103111039L);
orderDemo.setDesc("创建");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111065L);
orderDemo.setDesc("创建");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111039L);
orderDemo.setDesc("付款");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103117235L);
orderDemo.setDesc("创建");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111065L);
orderDemo.setDesc("付款");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103117235L);
orderDemo.setDesc("付款");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111065L);
orderDemo.setDesc("完成");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111039L);
orderDemo.setDesc("推送");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103117235L);
orderDemo.setDesc("完成");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111039L);
orderDemo.setDesc("完成");
orderList.add(orderDemo);
return orderList;
}
}
2.消息顺序消费
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
/**
* @description 消息顺序消费
*/
public class SortConsumerMsg {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_Group");
consumer.setNamesrvAddr("localhost:8080");
//如果第一次启动从队列头开始拉取消息,如果非第一次启动那么就按照上次消息的offset继续拉取消息
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("TopicTest","TagA||TagB||TagC");//从哪拉取消息
consumer.registerMessageListener(new MessageListenerOrderly() {//这个代表单线程处理有序的消息
@Override
public ConsumeOrderlyStatus consumeMessage(List list, ConsumeOrderlyContext consumeOrderlyContext) {
consumeOrderlyContext.setAutoCommit(true);//自动提交事务
try {
for (MessageExt messageExt : list) {
//对有序的消息进行业务处理
}
return ConsumeOrderlyStatus.SUCCESS;
}catch (Exception e){
//如果消息处理有问题.返回一个状态,告诉mq暂停一下,稍后在继续拉消息
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
}
});
consumer.start();//启动消费者
}
}
3.使用顺序消息的情况
比如订单系统更新数据库的操作,如果有另外一个系统比如大数据系统需要同步更新订单数据到自己的数据库中,
订单系统有一笔交易,涉及到用户插入和更新,insert into order(money,id) values('0','1');update order set money='100' where id='1';
订单系统推送了这个操作记录到了两个messagequeue中,这个时候,两个消费者可能有一个先处理了更新的操作,结果数据不存在,更新失败,然后另外一个执行了插入操作,这个时候,两个数据库的内容就会出现不一致的情况,对于涉及到金钱的操作,这样肯定是不行的,所以在发送端,producer,我们根据订单id%messagequeue的总数取模,这样相同的id操作就会发送到相同的队列中,实现了顺序发送的目的,依赖的就是send方法中的
MessageQueueSelector()方法;
那么你仅仅是顺序发送还不行,对于消息的消费,你还必须保障顺序消费,还不能开启多线程,多线程消费,也会导致消息乱序了;
这个就依赖了消费者中的
MessageListenerOrderly()方法;
这个保障了消息消费只有一个线程进行处理.
4.对于消息重复消费
可能是重复发送,也可能是消费者重复消费,
可以在发送端进行消息幂等,引入redis(不能绝对保证幂等)或者发消息之前先去mq查询是否存在
可以在消费端进行业务判断,查询是否给客户发送过消息
这里推荐消费端进行业务判断实现消息幂等;