Storm/Trident integration for RocketMQ. This package includes the core spout, bolt and trident states that allows a storm topology to either write storm tuples into a topic or read from topics in a storm topology.
The spout included in this package for reading data from a topic.
To use the RocketMqSpout
, you construct an instance of it by specifying a Properties instance which including rocketmq configs.
RocketMqSpout uses RocketMQ MQPushConsumer as the default implementation. PushConsumer is a high level consumer API, wrapping the pulling details. Looks like broker push messages to consumer.
RocketMqSpout will retry 3(use SpoutConfig.DEFAULT_MESSAGES_MAX_RETRY
to change the value) times when messages are failed.
Properties properties = new Properties();
properties.setProperty(SpoutConfig.NAME_SERVER_ADDR, nameserverAddr);
properties.setProperty(SpoutConfig.CONSUMER_GROUP, group);
properties.setProperty(SpoutConfig.CONSUMER_TOPIC, topic);
RocketMqSpout spout = new RocketMqSpout(properties);
The bolt and trident state included in this package for write data into a topic.
The main API for mapping Storm tuple to a RocketMQ Message is the org.apache.storm.rocketmq.common.mapper.TupleToMessageMapper
interface:
public interface TupleToMessageMapper extends Serializable {
String getKeyFromTuple(ITuple tuple);
byte[] getValueFromTuple(ITuple tuple);
}
storm-rocketmq
includes a general purpose TupleToMessageMapper
implementation called FieldNameBasedTupleToMessageMapper
.
The main API for selecting topic and tags is the org.apache.storm.rocketmq.common.selector.TopicSelector
interface:
public interface TopicSelector extends Serializable {
String getTopic(ITuple tuple);
String getTag(ITuple tuple);
}
storm-rocketmq
includes general purpose TopicSelector
implementations called DefaultTopicSelector
and FieldNameBasedTopicSelector
.
To use the RocketMqBolt
, you construct an instance of it by specifying TupleToMessageMapper, TopicSelector and Properties instances.
RocketMqBolt send messages async by default. You can change this by invoking withAsync(false)
.
TupleToMessageMapper mapper = new FieldNameBasedTupleToMessageMapper("word", "count");
TopicSelector selector = new DefaultTopicSelector(topic);
properties = new Properties();
properties.setProperty(RocketMqConfig.NAME_SERVER_ADDR, nameserverAddr);
RocketMqBolt insertBolt = new RocketMqBolt()
.withMapper(mapper)
.withSelector(selector)
.withProperties(properties);
We support trident persistent state that can be used with trident topologies. To create a RocketMQ persistent trident state you need to initialize it with the TupleToMessageMapper, TopicSelector, Properties instances. See the example below:
TupleToMessageMapper mapper = new FieldNameBasedTupleToMessageMapper("word", "count");
TopicSelector selector = new DefaultTopicSelector(topic);
Properties properties = new Properties();
properties.setProperty(RocketMqConfig.NAME_SERVER_ADDR, nameserverAddr);
RocketMqState.Options options = new RocketMqState.Options()
.withMapper(mapper)
.withSelector(selector)
.withProperties(properties);
StateFactory factory = new RocketMqStateFactory(options);
TridentTopology topology = new TridentTopology();
Stream stream = topology.newStream("spout1", spout);
stream.partitionPersist(factory, fields,
new RocketMqStateUpdater(), new Fields());