博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Storm【Storm-MongoDB接口】- 1: 简要介绍
阅读量:5913 次
发布时间:2019-06-19

本文共 7716 字,大约阅读时间需要 25 分钟。

hot3.png

阅读前提:

    其一:    您需要对于MongoDB有一个初步的了解。

    其二:    您需要对Storm本身有所了解

阅读建议:

                由于整个Storm接口系列包含了围绕Storm实时处理的框架的一系列接口,在一系列的接口文档之中,请对比Storm-hbase接口的博文

整体的Storn接口分为以下的几个class

1:MongoBolt.java

2 : MongoSpout.java

3 : MongoTailableCursorTopology.java

4 : SimpleMongoBolt.java

看代码说话:

package storm.mongo;import java.util.Map;import backtype.storm.task.OutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.base.BaseRichBolt;import backtype.storm.tuple.Tuple;import com.mongodb.DB;import com.mongodb.DBObject;import com.mongodb.MongoClient;import com.mongodb.MongoException;import com.mongodb.WriteConcern;/** * * 注意在这里,没有实现批处理的调用,并且只是一个抽象类,对于Mongo的Storm交互做了一次封装 * * @author Adrian Petrescu 
 * */public abstract class MongoBolt extends BaseRichBolt { private OutputCollector collector; // MOngDB的DB对象 private DB mongoDB;         //记录我们的主机,端口,和MongoDB的数据DB民粹 private final String mongoHost; private final int mongoPort; private final String mongoDbName; /**  * @param mongoHost The host on which Mongo is running.  * @param mongoPort The port on which Mongo is running.  * @param mongoDbName The Mongo database containing all collections being  * written to.  */ protected MongoBolt(String mongoHost, int mongoPort, String mongoDbName) { this.mongoHost = mongoHost; this.mongoPort = mongoPort; this.mongoDbName = mongoDbName; } @Override public void prepare( @SuppressWarnings("rawtypes") Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; try {         //prepare方法目前在初始化的过程之中得到了一个Mongo的连接 this.mongoDB = new MongoClient(mongoHost, mongoPort).getDB(mongoDbName); } catch (Exception e) { throw new RuntimeException(e); } } @Override public void execute(Tuple input) {              //注意我们在这里还有一个判断,判断当前是否该发射 if (shouldActOnInput(input)) { String collectionName = getMongoCollectionForInput(input); DBObject dbObject = getDBObjectForInput(input); if (dbObject != null) { try { mongoDB.getCollection(collectionName).save(dbObject, new WriteConcern(1)); collector.ack(input); } catch (MongoException me) { collector.fail(input); } } } else { collector.ack(input); } } /**  * Decide whether or not this input tuple should trigger a Mongo write.  *  * @param input the input tuple under consideration  * @return {@code true} iff this input tuple should trigger a Mongo write  */ public abstract boolean shouldActOnInput(Tuple input); /**  * Returns the Mongo collection which the input tuple should be written to.  *  * @param input the input tuple under consideration  * @return the Mongo collection which the input tuple should be written to  */ public abstract String getMongoCollectionForInput(Tuple input); /**  * Returns the DBObject to store in Mongo for the specified input tuple.  *     拿到DBObject的一个抽象类      * @param input the input tuple under consideration  * @return the DBObject to be written to Mongo  */ public abstract DBObject getDBObjectForInput(Tuple input); //注意这里随着计算的终结被关闭了。 @Override public void cleanup() { this.mongoDB.getMongo().close(); }}

2 :

  

package storm.mongo;import java.util.List;import java.util.Map;import java.util.concurrent.LinkedBlockingQueue;import java.util.concurrent.atomic.AtomicBoolean;import backtype.storm.spout.SpoutOutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.topology.base.BaseRichSpout;import backtype.storm.utils.Utils;import com.mongodb.BasicDBObject;import com.mongodb.Bytes;import com.mongodb.DB;import com.mongodb.DBCursor;import com.mongodb.DBObject;import com.mongodb.MongoClient;import com.mongodb.MongoException;/*** A Spout which consumes documents from a Mongodb tailable cursor.** Subclasses should simply override two methods:* 
  • {@link #declareOutputFields(OutputFieldsDeclarer) declareOutputFields}* 
  • {@link #dbObjectToStormTuple(DBObject) dbObjectToStormTuple}, which turns* a Mongo document into a Storm tuple matching the declared output fields.* 
*** 

WARNING: You can only use tailable cursors on capped collections.* * @author Dan Beaulieu 

**/// 在这里,抽象的过程中,依旧保持了第一层的Spout为一个抽象类,MongoSpout为abstract的一个抽象类,子类在继承这// 个类的过程之中实现特定的方法即可// 这里还有一个类似Cursor的操作。public abstract class MongoSpout extends BaseRichSpout { private SpoutOutputCollector collector; private LinkedBlockingQueue
 queue; private final AtomicBoolean opened = new AtomicBoolean(false); private DB mongoDB; private final DBObject query; private final String mongoHost; private final int mongoPort; private final String mongoDbName; private final String mongoCollectionName; public MongoSpout(String mongoHost, int mongoPort, String mongoDbName, String mongoCollectionName, DBObject query) { this.mongoHost = mongoHost; this.mongoPort = mongoPort; this.mongoDbName = mongoDbName; this.mongoCollectionName = mongoCollectionName; this.query = query; } class TailableCursorThread extends Thread { // 内部类 TailableCursorThread线程 //注意在其中我们使用了LinkedBlockingQueue的对象,有关java高并发的集合类,请参考本ID的【Java集合类型的博文】博文。 LinkedBlockingQueue
 queue; String mongoCollectionName; DB mongoDB; DBObject query; public TailableCursorThread(LinkedBlockingQueue
 queue, DB mongoDB, String mongoCollectionName, DBObject query) { this.queue = queue; this.mongoDB = mongoDB; this.mongoCollectionName = mongoCollectionName; this.query = query; } public void run() { while(opened.get()) { try { // create the cursor mongoDB.requestStart(); final DBCursor cursor = mongoDB.getCollection(mongoCollectionName) .find(query) .sort(new BasicDBObject("$natural", 1)) .addOption(Bytes.QUERYOPTION_TAILABLE) .addOption(Bytes.QUERYOPTION_AWAITDATA); try { while (opened.get() && cursor.hasNext()) {                     final DBObject doc = cursor.next();                     if (doc == null) break;                     queue.put(doc);                 } } finally { try {  if (cursor != null) cursor.close();  } catch (final Throwable t) { }                     try {                       mongoDB.requestDone();                       } catch (final Throwable t) { }                 } Utils.sleep(500); } catch (final MongoException.CursorNotFound cnf) { // rethrow only if something went wrong while we expect the cursor to be open.                    if (opened.get()) {                     throw cnf;                    }                } catch (InterruptedException e) { break; } } }; } @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.collector = collector; this.queue = new LinkedBlockingQueue
(1000); try { this.mongoDB = new MongoClient(this.mongoHost, this.mongoPort).getDB(this.mongoDbName); } catch (Exception e) { throw new RuntimeException(e); } TailableCursorThread listener = new TailableCursorThread(this.queue, this.mongoDB, this.mongoCollectionName, this.query); this.opened.set(true); listener.start(); } @Override public void close() { this.opened.set(false); } @Override public void nextTuple() { DBObject dbo = this.queue.poll(); if(dbo == null) {            Utils.sleep(50);        } else {            this.collector.emit(dbObjectToStormTuple(dbo));        } } @Override public void ack(Object msgId) { // TODO Auto-generated method stub } @Override public void fail(Object msgId) { // TODO Auto-generated method stub } public abstract List
 dbObjectToStormTuple(DBObject message);}

转载于:https://my.oschina.net/infiniteSpace/blog/332849

你可能感兴趣的文章
asyncio 的使用姿势
查看>>
poj 1287 networking
查看>>
PHP高级编程之守护进程,实现优雅重启
查看>>
百分比的空div ,可以设置空div的高度 让其占位 ,如果不设置高度或者宽带的话,就会自动隐藏不占...
查看>>
保存iptables规则的两种方式
查看>>
get current position
查看>>
区分一下强制类型转换运算符重载/赋值运算符重载/对象定义的赋值
查看>>
CSS行高line-height的理解
查看>>
Oracle中connect by...start with...的使用
查看>>
Java 备忘: 使用 Jackson 包的 XML 注解的例子
查看>>
1.8 chown 命令
查看>>
Eclipse最新版注释模板设置详解
查看>>
对象的属性
查看>>
在MS SQL Server数据库批量查看表的大小的三种方法
查看>>
oracle高水位标记(HWM)简单介绍
查看>>
Golang正则模块使用
查看>>
使用SpringMVC创建支持向下兼容的版本化的API接口
查看>>
写导航时候的规范
查看>>
Android Stuido提示grandle错误
查看>>
MQ消息
查看>>