在前面已经分析在Trident中的Emmiter、Co
RocketMqEmmiter.java
import java.util.List;
import org.json.simple.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import storm.trident.operation.TridentCollector;
import storm.trident.spout.IPartitionedTridentSpout.Emitter;
import storm.trident.spout.ISpoutPartition;
import storm.trident.topology.TransactionAttempt;
import backtype.storm.tuple.Values;
import com.alibaba.dubbo.common.utils.LRUCache;
import com.alibaba.rocketmq.client.consumer.PullResult;
import com.alibaba.rocketmq.client.consumer.PullStatus;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.message.MessageExt;
import com.alibaba.rocketmq.common.message.MessageQueue;
import com.google.common.collect.Lists;
import com.pajk.wrestling.rocketmq.RocketMqConsumer;
public class RocketMQEmitter implements Emitter<List<MessageQueue>, ISpoutPartition, JSONObject> {
private static final Logger logger=LoggerFactory.getLogger(RocketMQEmitter.class);
//保证客户端幂等性
private volatile LRUCache<String, MessageExt> idempotentCache=null;
{
idempotentCache=new LRUCache<String, MessageExt>();
idempotentCache.setMaxCapacity(10000);
}
@Override
public List<ISpoutPartition> getOrderedPartitions(
List<MessageQueue> allPartitionInfo) {
List<ISpoutPartition> partitions = null;
if(allPartitionInfo!=null&&allPartitionInfo.size()>0){
partitions=Lists.newArrayList();
for (final MessageQueue messageQueue : allPartitionInfo) {
partitions.add(new ISpoutPartition() {
@Override
public String getId() {
return RocketMqConsumer.makeMessageQueueUniqueId(messageQueue);
}
});
}
}
if(partitions==null||partitions.size()==0){
throw new RuntimeException("partitions is null");
}
String partitionsStr=mkpartitionsStr(partitions);
logger.info("all partitions,{}",partitionsStr);
return partitions;
}
private String mkpartitionsStr(List<ISpoutPartition> partitions) {
StringBuilder builder=new StringBuilder();
for (ISpoutPartition iSpoutPartition : partitions) {
builder.append("\n"+iSpoutPartition.getId());
}
return builder.toString();
}
/**
* 从指定分区中获取数据,注意partition和lastPartitionMeta两个是一一对应的
* lastPartitionMeta表示的是当前传入的partition的分区元数据
*/
@Override
public JSONObject emitPartitionBatchNew(TransactionAttempt tx,
TridentCollector collector, ISpoutPartition partition,
JSONObject lastPartitionMeta) {
//1、根据partitionId获取对应的消息队列
MessageQueue mq=RocketMqConsumer.getMessageQueueByUniqueId(partition.getId());
//2、获取队列的当前消费进度offset,先从lastPartitionMeta取,lastPartitionMeta=null,连接远程获取
long beginOffset = 0;
if(lastPartitionMeta!=null){
Object object = lastPartitionMeta.get(RocketMqPartitionMeta.NEXT_OFFSET);
if(object instanceof String){
beginOffset=Long.parseLong((String)object);
}else if(object instanceof Long){
beginOffset=(Long) object;
}
}else{
try {
logger.info("queue:{},lastPartitionMeta is null",partition.getId());
beginOffset=RocketMqConsumer.getConsumer().fetchConsumeOffset(mq, true);
beginOffset=(beginOffset==-1)?0:beginOffset;
} catch (MQClientException e) {
logger.error("fetch queue offset error ,queue:"+mq,e);
return lastPartitionMeta;
}
}
int batchSize=0;
//3、获取消息并处理
PullResult pullResult;
try {
pullResult = RocketMqConsumer.getConsumer().pull(mq, RocketMqConsumer.getRocketMQConfig().getTopicTag(), beginOffset, RocketMqConsumer.getRocketMQConfig().getPullBatchSize());
PullStatus pullStatus = pullResult.getPullStatus();
switch (pullStatus) {
case FOUND:
logger.info("queue:{},found new msgs,pull result{}:",partition.getId(),pullResult);
List<MessageExt> msgFoundList = pullResult.getMsgFoundList();
if(msgFoundList!=null&&msgFoundList.size()>0){
batchSize=msgFoundList.size();
for (MessageExt messageExt : msgFoundList) {
if(!idempotentCache.containsKey(messageExt.getMsgId())){
String msgContent = new String(messageExt.getBody());
collector.emit(new Values(tx,msgContent));
idempotentCache.put(messageExt.getMsgId(), messageExt);
System.out.println("emit message:"+messageExt+",message content:"+msgContent);
}else{
logger.warn("message {} has consumed!",messageExt);
}
}
}
break;
case OFFSET_ILLEGAL://当beginOffset小于Topic队列的minOffset,会出现此问题
logger.warn("OFFSET_ILLEGAL ,Message Queue:{},pullReuslt:{},caculate beginOffset:{}",new Object[]{mq,pullResult,beginOffset});
break;
case NO_NEW_MSG:
break;
case NO_MATCHED_MSG://当队列中存在其他Tag的消息时,出现此情况
logger.warn("May be some msg has other tag exsits in the queue:{},pull status:{}",mq,pullStatus);
break;
default:
logger.warn("UNKNOW STATUS:{},Message Queue:{}",pullStatus,mq);
break;
}
//不管pullStatus的状态如何,都更新ConsumeOffset
RocketMqConsumer.getConsumer().updateConsumeOffset(mq, pullResult.getNextBeginOffset());
} catch (Exception e) {
logger.error("pull message error,topic:"+RocketMqConsumer.getRocketMQConfig().getTopic()+",queue:"+mq,e);
return lastPartitionMeta;
}
//4、更新PartitionMeta
RocketMqPartitionMeta rocketMqPartitionMeta
=new RocketMqPartitionMeta(partition.getId(), tx.toString(), beginOffset, pullResult.getNextBeginOffset());
rocketMqPartitionMeta.setBatchSize(batchSize);
return rocketMqPartitionMeta;
}
@Override
public void refreshPartitions(
List<ISpoutPartition> partitionResponsibilities) {
logger.info("refreshPartitions");
}
@Override
public void emitPartitionBatch(TransactionAttempt tx,
TridentCollector collector, ISpoutPartition partition,
JSONObject partitionMeta) {
}
@Override
public void close() {
}
}RocketMQCoordinator.java
import java.util.List;
import storm.trident.spout.IPartitionedTridentSpout.Coordinator;
import com.alibaba.rocketmq.common.message.MessageQueue;
import com.pajk.wrestling.rocketmq.RocketMqConsumer;
public class RocketMQCoordinator implements Coordinator<List<MessageQueue>>{
@Override
public List<MessageQueue> getPartitionsForBatch() {
return RocketMqConsumer.getMessageQueues();
}
@Override
public boolean isReady(long txid) {
return RocketMqConsumer.hasNewMesage();
}
@Override
public void close() {
}
}RocketMqPartitionMeta.java
/**
* Trident使用json-simple进行序列化元数据到zookeeper
* json simple只支持8中基本数据类型+List+Map+JsonAware类型
* @author TIANSHOUZHI336
*
*/
@SuppressWarnings("unchecked")
public class RocketMqPartitionMeta extends JSONObject{
/**
*
*/
public static final long serialVersionUID = 1003604473740641741L;
public static final String QUEUE_ID="queueId";
public static final String CURRENT_OFFSET="currentOffset";
public static final String NEXT_OFFSET="nextOffset";
public static final String TRANSACTION_ID="transactionId";
public static final String OCCUR_TIME="occurTime";
public static final String BATCH_SIZE= "batchSize";
public RocketMqPartitionMeta() {
super();
}
public RocketMqPartitionMeta(String queueId,String transactionId,long currentOffset,long nextOffset){
this.setCurrentOffset(currentOffset);
this.setNextOffset(nextOffset);
this.setQueueId(queueId);
this.setTransactionId(transactionId);
}
public void setQueueId(String queueId){
this.put(QUEUE_ID, queueId);
}
public String getQueueId(){
return (String) this.get(QUEUE_ID);
}
public void setCurrentOffset(long currentOffset){
this.put(CURRENT_OFFSET, currentOffset);
}
public long getCurrentOffset(){
return (Long) this.get(CURRENT_OFFSET);
}
public void setNextOffset(long nextOffset){
this.put(NEXT_OFFSET, nextOffset);
}
public long getNextOffset(){
return (Long) this.get(NEXT_OFFSET);
}
public void setTransactionId(String transactionId){
this.put(TRANSACTION_ID, transactionId);
}
public String getTransectionId(){
return (String) this.get(TRANSACTION_ID);
}
public void setOccurTime(long occurTime){
String datetimeStr = DateUtils.dayFormatDateTime(occurTime);
this.put(OCCUR_TIME,datetimeStr);
}
public Date getOccurTime(){
String time=(String) this.get(OCCUR_TIME);
return DateUtils.parse(time, DateUtils.DEFAULT_FORMAT);
}
public void setBatchSize(int batchSize){
this.put(BATCH_SIZE, batchSize);
}
public int getBatchSize(){
return (Integer)this.get(BATCH_SIZE);
}
}RocketMqIPatitionedTridentSpout.java
import java.util.List;
import java.util.Map;
import org.json.simple.JSONObject;
import storm.trident.spout.IPartitionedTridentSpout;
import storm.trident.spout.ISpoutPartition;
import backtype.storm.task.TopologyContext;
import backtype.storm.tuple.Fields;
import com.alibaba.rocketmq.common.message.MessageQueue;
@SuppressWarnings("rawtypes")
public class RocketMqIPatitionedTridentSpout implements IPartitionedTridentSpout<List<MessageQueue>, ISpoutPartition, JSONObject>{
/**
*
*/
private static final long serialVersionUID = 4572682097086392244L;
@Override
//这个方法会被调用多次,不适合初始化consumer
public storm.trident.spout.IPartitionedTridentSpout.Coordinator<List<MessageQueue>> getCoordinator(
Map conf, TopologyContext context) {
return new RocketMQCoordinator();
}
@Override
public storm.trident.spout.IPartitionedTridentSpout.Emitter<List<MessageQueue>, ISpoutPartition, JSONObject> getEmitter(
Map conf, TopologyContext context) {
return new RocketMQEmitter();
}
@Override
public Map getComponentConfiguration() {
return null;
}
@Override
//这个方法也会被调用多次,不是初始化
public Fields getOutputFields() {
return new Fields("tId", "message");
}
}RocketMqConsumer.java
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.alibaba.rocketmq.client.consumer.DefaultMQPullConsumer;
import com.alibaba.rocketmq.client.consumer.MessageQueueListener;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.message.MessageQueue;
import com.google.common.collect.Sets;
import com.pajk.wrestling.util.ConfigUtils;
public abstract class RocketMqConsumer {
private static final Logger logger=LoggerFactory.getLogger(RocketMqConsumer.class);
private static DefaultMQPullConsumer pullConsumer;
public static boolean initialized=false;
public static RocketMQConfig rocketMQConfig;
private static Map<String,MessageQueue> partitionIdQueueMap;
private static List<MessageQueue> queues;
static{
initRocketMqConfig();
initConsumer();
initMessageQueues();
registerMessageQueueListener();
}
private static void initRocketMqConfig() {
rocketMQConfig = ConfigUtils.getRocketMQConfig();
}
private static void initConsumer() {
pullConsumer=new DefaultMQPullConsumer();
pullConsumer.setConsumerGroup(rocketMQConfig.getGroupId());
pullConsumer.setInstanceName(rocketMQConfig.getInstanceName());
pullConsumer.setNamesrvAddr(rocketMQConfig.getNamesrvAddr());
pullConsumer.setRegisterTopics(Sets.newHashSet(rocketMQConfig.getTopic()));
logger.info("rocketmq pullConsumer config:{}",rocketMQConfig);
try {
pullConsumer.start();
logger.info("rocketmq pullConsumer startup success!");
} catch (MQClientException e) {
throw new RuntimeException("consumer start fail!",e);
}
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
if (pullConsumer != null) {
pullConsumer.shutdown();
logger.info("rocketmq pullConsumer shutdown success!");
}
}
});
}
private static void initMessageQueues() {
try {
//因为队列可能扩容,每次开启一个新的事务(batch)时,都重新拉取一下最新的队列
Set<MessageQueue> messageQueues = RocketMqConsumer.getConsumer().fetchSubscribeMessageQueues(RocketMqConsumer.getRocketMQConfig().getTopic());
queues=new ArrayList<MessageQueue>(messageQueues);
partitionIdQueueMap=new HashMap<String,MessageQueue>();
for (MessageQueue messageQueue : messageQueues) {
partitionIdQueueMap.put(makeMessageQueueUniqueId(messageQueue), messageQueue);
}
} catch (MQClientException e) {
logger.error(e.getMessage(), e);
throw new RuntimeException(e);
}
}
private static void registerMessageQueueListener() {
pullConsumer.registerMessageQueueListener(rocketMQConfig.getTopic(),new MessageQueueListener() {
@Override
public void messageQueueChanged(String topic, Set<MessageQueue> mqAll,
Set<MessageQueue> mqDivided) {
initMessageQueues();
}
} );
}
public static DefaultMQPullConsumer getConsumer(){
return pullConsumer;
}
public static String makeMessageQueueUniqueId(MessageQueue messageQueue) {
/*String brokerName = messageQueue.getBrokerName();
String topic = messageQueue.getTopic();
int queueId = messageQueue.getQueueId();*/
return messageQueue.getBrokerName()+"-queue-"+messageQueue.getQueueId();
}
public static RocketMQConfig getRocketMQConfig(){
return rocketMQConfig;
}
public static List<MessageQueue> getMessageQueues(){
return queues;
}
public static MessageQueue getMessageQueueByUniqueId(String uniqueId){
return partitionIdQueueMap.get(uniqueId);
}
public static boolean hasNewMesage(){
/*try{
for (MessageQueue messageQueue : queues) {
long offset = pullConsumer.fetchConsumeOffset(messageQueue, true);
offset=(offset<0)?0:offset;
PullResult pullResult = pullConsumer.pull(messageQueue, rocketMQConfig.getTopicTag(), offset, rocketMQConfig.getPullBatchSize());
PullStatus pullStatus = pullResult.getPullStatus();
switch (pullStatus) {
case FOUND:
return true;
case OFFSET_ILLEGAL:
pullConsumer.updateConsumeOffset(messageQueue, pullResult.getNextBeginOffset());
logger.warn("OFFSET_ILLEGAL,Queue:{},PullResult:{}",messageQueue,pullResult);break;
case NO_NEW_MSG:break;
case NO_MATCHED_MSG:
logger.warn("May be some msg has other tag exsits in the queue:{},pull status:{}",messageQueue,pullStatus);
break;
default:break;
}
}
}catch(Exception e){
logger.error("decide has new message error", e);
}*/
return true;
}
}RocketMQConfig.java
import java.io.Serializable;
import org.apache.commons.lang.builder.ToStringBuilder;
import org.apache.commons.lang.builder.ToStringStyle;
/**
* @author Von Gosling
*/
public class RocketMQConfig implements Serializable {
private static final long serialVersionUID = 4157424979688590880L;
private String namesrvAddr;
/**
* Unique mark for every JVM instance
*/
private String instanceName;
/**
* Group by message actor
*/
private String groupId;
/*
* Message topic
*/
private String topic;
/**
* Message topic tag
*/
private String topicTag;
/**
* Minimal consumer thread count
*/
private int consumeThreadMin = 20;
/**
* Maximal consumer thread count
*/
private int consumeThreadMax = 64;
/**
* If piled-up message exceeds this value,adjust consumer thread to max
* value dynamically
*/
private long adjustThreadPoolNumsThreshold = 100000l;
/**
* Local message queue threshold, trigger flow control if exceeds this value
*/
private int pullThresholdForQueue = 1024;
/**
* The message size from server for every pull batch
*/
private int pullBatchSize = 32;
/**
* Pull interval from server for every pull
*/
private long pullInterval = 0;
/**
* Fetch message size from local queue
*/
private int consumeMessageBatchMaxSize = 1;
/**
* Consumption of local sequence, will affect performance
*/
private boolean ordered;
/**
* The max allowed failures for one single message, skip the failure message
* if excesses. -1 means try again until success
*/
private int maxFailTimes = 5;
public RocketMQConfig() {
}
public RocketMQConfig(String namesrvAddr,String consumerGroup, String topic, String topicTag) {
super();
this.groupId = consumerGroup;
this.topic = topic;
this.topicTag = topicTag;
this.namesrvAddr=namesrvAddr;
}
/**
* @return the instanceName
*/
public String getInstanceName() {
return instanceName;
}
/**
* @param instanceName
* the instanceName to set
*/
public void setInstanceName(String instanceName) {
this.instanceName = instanceName;
}
/**
* @return the groupId
*/
public String getGroupId() {
return groupId;
}
/**
* @param groupId
* the groupId to set
*/
public void setGroupId(String groupId) {
this.groupId = groupId;
}
/**
* @return the topic
*/
public String getTopic() {
return topic;
}
/**
* @param topic
* the topic to set
*/
public void setTopic(String topic) {
this.topic = topic;
}
/**
* @return the topicTag
*/
public String getTopicTag() {
return topicTag;
}
/**
* @param topicTag
* the topicTag to set
*/
public void setTopicTag(String topicTag) {
this.topicTag = topicTag;
}
/**
* @return the consumeThreadMin
*/
public int getConsumeThreadMin() {
return consumeThreadMin;
}
/**
* @param consumeThreadMin
* the consumeThreadMin to set
*/
public void setConsumeThreadMin(int consumeThreadMin) {
this.consumeThreadMin = consumeThreadMin;
}
/**
* @return the consumeThreadMax
*/
public int getConsumeThreadMax() {
return consumeThreadMax;
}
/**
* @param consumeThreadMax
* the consumeThreadMax to set
*/
public void setConsumeThreadMax(int consumeThreadMax) {
this.consumeThreadMax = consumeThreadMax;
}
/**
* @return the adjustThreadPoolNumsThreshold
*/
public long getAdjustThreadPoolNumsThreshold() {
return adjustThreadPoolNumsThreshold;
}
/**
* @param adjustThreadPoolNumsThreshold
* the adjustThreadPoolNumsThreshold to set
*/
public void setAdjustThreadPoolNumsThreshold(
long adjustThreadPoolNumsThreshold) {
this.adjustThreadPoolNumsThreshold = adjustThreadPoolNumsThreshold;
}
/**
* @return the pullThresholdForQueue
*/
public int getPullThresholdForQueue() {
return pullThresholdForQueue;
}
/**
* @param pullThresholdForQueue
* the pullThresholdForQueue to set
*/
public void setPullThresholdForQueue(int pullThresholdForQueue) {
this.pullThresholdForQueue = pullThresholdForQueue;
}
/**
* @return the pullBatchSize
*/
public int getPullBatchSize() {
return pullBatchSize;
}
/**
* @param pullBatchSize
* the pullBatchSize to set
*/
public void setPullBatchSize(int pullBatchSize) {
this.pullBatchSize = pullBatchSize;
}
/**
* @return the pullInterval
*/
public long getPullInterval() {
return pullInterval;
}
/**
* @param pullInterval
* the pullInterval to set
*/
public void setPullInterval(long pullInterval) {
this.pullInterval = pullInterval;
}
/**
* @return the consumeMessageBatchMaxSize
*/
public int getConsumeMessageBatchMaxSize() {
return consumeMessageBatchMaxSize;
}
/**
* @param consumeMessageBatchMaxSize
* the consumeMessageBatchMaxSize to set
*/
public void setConsumeMessageBatchMaxSize(int consumeMessageBatchMaxSize) {
this.consumeMessageBatchMaxSize = consumeMessageBatchMaxSize;
}
/**
* @return the ordered
*/
public boolean isOrdered() {
return ordered;
}
/**
* @param ordered
* the ordered to set
*/
public void setOrdered(boolean ordered) {
this.ordered = ordered;
}
/**
* @return the maxFailTimes
*/
public int getMaxFailTimes() {
return maxFailTimes;
}
/**
* @param maxFailTimes
* the maxFailTimes to set
*/
public void setMaxFailTimes(int maxFailTimes) {
this.maxFailTimes = maxFailTimes;
}
@Override
public String toString() {
return ToStringBuilder.reflectionToString(this,
ToStringStyle.SHORT_PREFIX_STYLE);
}
public String getNamesrvAddr() {
return namesrvAddr;
}
public void setNamesrvAddr(String namesrvAddr) {
this.namesrvAddr = namesrvAddr;
}
}ConfigUtils.java
import java.io.IOException;
import java.io.InputStream;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Properties;
import org.apache.commons.lang.BooleanUtils;
import backtype.storm.Config;
import com.pajk.wrestling.rocketmq.RocketMQConfig;
/**
* Utilities for RocketMQ spout regarding its configuration and reading values
* from the storm configuration.
*
* @author Von Gosling
*/
public abstract class ConfigUtils {
public static final String CONFIG_TOPIC = "rocketmq.spout.topic";
public static final String CONFIG_CONSUMER_GROUP = "rocketmq.spout.consumer.group";
public static final String CONFIG_TOPIC_TAG = "rocketmq.spout.topic.tag";
public static final String CONFIG_ROCKETMQ = "rocketmq.config";
public static final String CONFIG_PREFETCH_SIZE = "rocketmq.prefetch.size";
public static final String CONFIG_NAMESRV_ADDR="public.rocketmq.domain.name";
public static Properties props;
static{
props = new Properties();
try {
InputStream is = ConfigUtils.class.getClassLoader().getResourceAsStream("wrestling-config.properties");
props.load(is);
} catch (IOException e) {
throw new RuntimeException("load config ocurr error!", e);
}
}
public static RocketMQConfig getRocketMQConfig() {
String topic = props.getProperty(CONFIG_TOPIC);
String consumerGroup = props.getProperty(ConfigUtils.CONFIG_CONSUMER_GROUP);
String topicTag = props.getProperty(ConfigUtils.CONFIG_TOPIC_TAG);
Integer pullBatchSize = Integer.parseInt(props.getProperty(ConfigUtils.CONFIG_PREFETCH_SIZE));
String nameServerAddr = props.getProperty(CONFIG_NAMESRV_ADDR);
RocketMQConfig mqConfig = new RocketMQConfig(nameServerAddr,consumerGroup, topic, topicTag);
try {
mqConfig.setInstanceName(consumerGroup+"_"+InetAddress.getLocalHost().getHostAddress().replaceAll("\\.", "_")+"_"+System.getProperty("worker.port"));
} catch (UnknownHostException e) {
throw new RuntimeException(e);
}
if (pullBatchSize != null && pullBatchSize > 0) {
mqConfig.setPullBatchSize(pullBatchSize);
}
boolean ordered = BooleanUtils.toBooleanDefaultIfNull(
Boolean.valueOf(props.getProperty("rocketmq.spout.ordered")), false);
mqConfig.setOrdered(ordered);
return mqConfig;
}
public static Config getTopologyConfig() {
Config config=new Config();
config.setNumWorkers(Integer.parseInt(props.getProperty("topology.workers")));
config.setNumAckers(Integer.parseInt(props.getProperty("topology.acker.executors")));
config.setMaxSpoutPending(Integer.parseInt(props.getProperty("topology.max.spout.pending")));
config.setMessageTimeoutSecs(Integer.parseInt(props.getProperty("topology.message.timeout.secs")));
config.put("topology.name", props.getProperty("topology.name"));
config.setDebug(Boolean.parseBoolean(props.getProperty("topology.debug","false")));
return config;
}
public static String get(String key){
return props.getProperty(key);
}
public static Integer getInt(String key){
return Integer.parseInt(props.getProperty(key));
}
}wrestling-config.properties
#rocketmq configuration 139.129.97.30 public.rocketmq.domain.name=mq1.test.pajkdc.com:9876;mq2.test.pajkdc.com:9876 #public.rocketmq.domain.name=139.129.97.30:9876 rocketmq.spout.consumer.group=cmtcenter_wrestling_group rocketmq.spout.topic=CMTCENTER rocketmq.spout.topic.tag=COMMENT_CREATED rocketmq.spout.ordered=false rocketmq.prefetch.size=32 #topopology configuration topology.acker.executors=1 topology.workers=1 topology.max.spout.pending=1 topology.message.timeout.secs=30 topology.debug=true