阅读前提:
其一: 您需要对于MongoDB有一个初步的了解。
其二: 您需要对Storm本身有所了解
阅读建议:
由于整个Storm接口系列包含了围绕Storm实时处理的框架的一系列接口,在一系列的接口文档之中,请对比Storm-hbase接口的博文
整体的Storn接口分为以下的几个class
1:MongoBolt.java
2 : MongoSpout.java
3 : MongoTailableCursorTopology.java
4 : SimpleMongoBolt.java
看代码说话:
1
package storm.mongo;import java.util.Map;import backtype.storm.task.OutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.base.BaseRichBolt;import backtype.storm.tuple.Tuple;import com.mongodb.DB;import com.mongodb.DBObject;import com.mongodb.MongoClient;import com.mongodb.MongoException;import com.mongodb.WriteConcern;/** * * 注意在这里,没有实现批处理的调用,并且只是一个抽象类,对于Mongo的Storm交互做了一次封装 * * @author Adrian Petrescu* */public abstract class MongoBolt extends BaseRichBolt { private OutputCollector collector; // MOngDB的DB对象 private DB mongoDB; //记录我们的主机,端口,和MongoDB的数据DB民粹 private final String mongoHost; private final int mongoPort; private final String mongoDbName; /** * @param mongoHost The host on which Mongo is running. * @param mongoPort The port on which Mongo is running. * @param mongoDbName The Mongo database containing all collections being * written to. */ protected MongoBolt(String mongoHost, int mongoPort, String mongoDbName) { this.mongoHost = mongoHost; this.mongoPort = mongoPort; this.mongoDbName = mongoDbName; } @Override public void prepare( @SuppressWarnings("rawtypes") Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; try { //prepare方法目前在初始化的过程之中得到了一个Mongo的连接 this.mongoDB = new MongoClient(mongoHost, mongoPort).getDB(mongoDbName); } catch (Exception e) { throw new RuntimeException(e); } } @Override public void execute(Tuple input) { //注意我们在这里还有一个判断,判断当前是否该发射 if (shouldActOnInput(input)) { String collectionName = getMongoCollectionForInput(input); DBObject dbObject = getDBObjectForInput(input); if (dbObject != null) { try { mongoDB.getCollection(collectionName).save(dbObject, new WriteConcern(1)); collector.ack(input); } catch (MongoException me) { collector.fail(input); } } } else { collector.ack(input); } } /** * Decide whether or not this input tuple should trigger a Mongo write. * * @param input the input tuple under consideration * @return {@code true} iff this input tuple should trigger a Mongo write */ public abstract boolean shouldActOnInput(Tuple input); /** * Returns the Mongo collection which the input tuple should be written to. * * @param input the input tuple under consideration * @return the Mongo collection which the input tuple should be written to */ public abstract String getMongoCollectionForInput(Tuple input); /** * Returns the DBObject to store in Mongo for the specified input tuple. * 拿到DBObject的一个抽象类 * @param input the input tuple under consideration * @return the DBObject to be written to Mongo */ public abstract DBObject getDBObjectForInput(Tuple input); //注意这里随着计算的终结被关闭了。 @Override public void cleanup() { this.mongoDB.getMongo().close(); }}
2 :
package storm.mongo;import java.util.List;import java.util.Map;import java.util.concurrent.LinkedBlockingQueue;import java.util.concurrent.atomic.AtomicBoolean;import backtype.storm.spout.SpoutOutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.topology.base.BaseRichSpout;import backtype.storm.utils.Utils;import com.mongodb.BasicDBObject;import com.mongodb.Bytes;import com.mongodb.DB;import com.mongodb.DBCursor;import com.mongodb.DBObject;import com.mongodb.MongoClient;import com.mongodb.MongoException;/*** A Spout which consumes documents from a Mongodb tailable cursor.** Subclasses should simply override two methods:*
- *
- {@link #declareOutputFields(OutputFieldsDeclarer) declareOutputFields}*
- {@link #dbObjectToStormTuple(DBObject) dbObjectToStormTuple}, which turns* a Mongo document into a Storm tuple matching the declared output fields.*
* WARNING: You can only use tailable cursors on capped collections.* * @author Dan Beaulieu