Storm/Trident integration for MongoDB. This package includes the core bolts and trident states that allows a storm topology to either insert storm tuples in a database collection or to execute update queries against a database collection in a storm topology.
The bolt and trident state included in this package for inserting data into a database collection.
The main API for inserting data in a collection using MongoDB is the org.apache.storm.mongodb.common.mapper.MongoMapper
interface:
public interface MongoMapper extends Serializable {
Document toDocument(ITuple tuple);
Document toDocumentByKeys(List<Object> keys);
}
storm-mongodb
includes a general purpose MongoMapper
implementation called SimpleMongoMapper
that can map Storm tuple to a Database document. SimpleMongoMapper
assumes that the storm tuple has fields with same name as the document field name in the database collection that you intend to write to.
public class SimpleMongoMapper implements MongoMapper {
private String[] fields;
@Override
public Document toDocument(ITuple tuple) {
Document document = new Document();
for(String field : fields){
document.append(field, tuple.getValueByField(field));
}
return document;
}
@Override
public Document toDocumentByKeys(List<Object> keys) {
Document document = new Document();
document.append("_id", MongoUtils.getID(keys));
return document;
}
public SimpleMongoMapper withFields(String... fields) {
this.fields = fields;
return this;
}
}
To use the MongoInsertBolt
, you construct an instance of it by specifying url, collectionName and a MongoMapper
implementation that converts storm tuple to DB document. The following is the standard URI connection scheme:
mongodb://[username:password@]host1[:port1][,host2[:port2],...[,hostN[:portN]]][/[database][?options]]
More options information(eg: Write Concern Options) about Mongo URI, you can visit https://docs.mongodb.org/manual/reference/connection-string/#connections-connection-options
String url = "mongodb://127.0.0.1:27017/test";
String collectionName = "wordcount";
MongoMapper mapper = new SimpleMongoMapper()
.withFields("word", "count");
MongoInsertBolt insertBolt = new MongoInsertBolt(url, collectionName, mapper);
The bolt included in this package for updating data from a database collection.
The main API for updating data in a collection using MongoDB is the org.apache.storm.mongodb.common.mapper.MongoUpdateMapper
interface:
public interface MongoUpdateMapper extends MongoMapper { }
storm-mongodb
includes a general purpose MongoUpdateMapper
implementation called SimpleMongoUpdateMapper
that can map Storm tuple to a Database document. SimpleMongoUpdateMapper
assumes that the storm tuple has fields with same name as the document field name in the database collection that you intend to write to.
SimpleMongoUpdateMapper
uses $set
operator for setting the value of a field in a document. More information about update operator, you can visit
https://docs.mongodb.org/manual/reference/operator/update/
public class SimpleMongoUpdateMapper extends SimpleMongoMapper implements MongoUpdateMapper {
private String[] fields;
@Override
public Document toDocument(ITuple tuple) {
Document document = new Document();
for(String field : fields){
document.append(field, tuple.getValueByField(field));
}
//$set operator: Sets the value of a field in a document.
return new Document("$set", document);
}
public SimpleMongoUpdateMapper withFields(String... fields) {
this.fields = fields;
return this;
}
}
The main API for creating a MongoDB query Filter is the org.apache.storm.mongodb.common.QueryFilterCreator
interface:
public interface QueryFilterCreator extends Serializable {
Bson createFilter(ITuple tuple);
Bson createFilterByKeys(List<Object> keys);
}
storm-mongodb
includes a general purpose QueryFilterCreator
implementation called SimpleQueryFilterCreator
that can create a MongoDB query Filter by given Tuple. QueryFilterCreator
uses $eq
operator for matching values that are equal to a specified value. More information about query operator, you can visit
https://docs.mongodb.org/manual/reference/operator/query/
public class SimpleQueryFilterCreator implements QueryFilterCreator {
private String field;
@Override
public Bson createFilter(ITuple tuple) {
return Filters.eq(field, tuple.getValueByField(field));
}
@Override
public Bson createFilterByKeys(List<Object> keys) {
return Filters.eq("_id", MongoUtils.getID(keys));
}
public SimpleQueryFilterCreator withField(String field) {
this.field = field;
return this;
}
}
To use the MongoUpdateBolt
, you construct an instance of it by specifying Mongo url, collectionName, a QueryFilterCreator
implementation and a MongoUpdateMapper
implementation that converts storm tuple to DB document.
MongoUpdateMapper mapper = new SimpleMongoUpdateMapper()
.withFields("word", "count");
QueryFilterCreator updateQueryCreator = new SimpleQueryFilterCreator()
.withField("word");
MongoUpdateBolt updateBolt = new MongoUpdateBolt(url, collectionName, updateQueryCreator, mapper);
//if a new document should be inserted if there are no matches to the query filter
//updateBolt.withUpsert(true);
//whether find all documents according to the query filter
//updateBolt.withMany(true);
Or use a anonymous inner class implementation for QueryFilterCreator
:
MongoUpdateMapper mapper = new SimpleMongoUpdateMapper()
.withFields("word", "count");
QueryFilterCreator updateQueryCreator = new QueryFilterCreator() {
@Override
public Bson createFilter(ITuple tuple) {
return Filters.gt("count", 3);
}
};
MongoUpdateBolt updateBolt = new MongoUpdateBolt(url, collectionName, updateQueryCreator, mapper);
//if a new document should be inserted if there are no matches to the query filter
//updateBolt.withUpsert(true);
The bolt included in this package for selecting data from a database collection.
The main API for selecting data in a collection using MongoDB is the org.apache.storm.mongodb.common.mapper.MongoLookupMapper
interface:
public interface MongoLookupMapper extends Serializable {
List<Values> toTuple(ITuple input, Document doc);
void declareOutputFields(OutputFieldsDeclarer declarer);
}
storm-mongodb
includes a general purpose MongoLookupMapper
implementation called SimpleMongoLookupMapper
that can converts a Mongo document to a list of storm values.
public class SimpleMongoLookupMapper implements MongoLookupMapper {
private String[] fields;
@Override
public List<Values> toTuple(ITuple input, Document doc) {
Values values = new Values();
for(String field : fields) {
if(input.contains(field)) {
values.add(input.getValueByField(field));
} else {
values.add(doc.get(field));
}
}
List<Values> result = new ArrayList<Values>();
result.add(values);
return result;
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields(fields));
}
public SimpleMongoLookupMapper withFields(String... fields) {
this.fields = fields;
return this;
}
}
To use the MongoLookupBolt
, you construct an instance of it by specifying Mongo url, collectionName, a QueryFilterCreator
implementation and a MongoLookupMapper
implementation that converts a Mongo document to a list of storm values.
MongoLookupMapper mapper = new SimpleMongoLookupMapper()
.withFields("word", "count");
QueryFilterCreator filterCreator = new SimpleQueryFilterCreator()
.withField("word");
MongoLookupBolt lookupBolt = new MongoLookupBolt(url, collectionName, filterCreator, mapper);
We support trident persistent state that can be used with trident topologies. To create a Mongo persistent trident state you need to initialize it with the url, collectionName, the MongoMapper
instance. See the example below:
MongoMapper mapper = new SimpleMongoMapper()
.withFields("word", "count");
MongoState.Options options = new MongoState.Options()
.withUrl(url)
.withCollectionName(collectionName)
.withMapper(mapper);
StateFactory factory = new MongoStateFactory(options);
TridentTopology topology = new TridentTopology();
Stream stream = topology.newStream("spout1", spout);
stream.partitionPersist(factory, fields,
new MongoStateUpdater(), new Fields());
TridentState state = topology.newStaticState(factory);
stream = stream.stateQuery(state, new Fields("word"),
new MongoStateQuery(), new Fields("columnName", "columnValue"));
stream.each(new Fields("word", "columnValue"), new PrintFunction(), new Fields());
NOTE:
If there is no unique index provided, trident state inserts in the case of failures may result in duplicate documents.
We also support trident MapState
. To create a Mongo trident MapState
you need to initialize it with the url, collectionName, the MongoMapper
and QueryFilterCreator
instance. See the example below:
MongoMapper mapper = new SimpleMongoMapper()
.withFields("word", "count");
QueryFilterCreator filterCreator = new SimpleQueryFilterCreator()
.withField("word");
MongoMapState.Options options = new MongoMapState.Options();
options.url = url;
options.collectionName = collectionName;
options.mapper = mapper;
options.queryCreator = filterCreator;
StateFactory factory = MongoMapState.transactional(options);
TridentTopology topology = new TridentTopology();
Stream stream = topology.newStream("spout1", spout);
TridentState state = stream.groupBy(new Fields("word"))
.persistentAggregate(factory, new Fields("count"), new Sum(), new Fields("sum"));
stream.stateQuery(state, new Fields("word"), new MapGet(), new Fields("sum"))
.each(new Fields("word", "sum"), new PrintFunction(), new Fields());