重庆分公司,新征程启航
为企业提供网站建设、域名注册、服务器等服务
本篇内容介绍了“Storm MongoDB接口怎么使用”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!
成都创新互联-专业网站定制、快速模板网站建设、高性价比广德网站开发、企业建站全套包干低至880元,成熟完善的模板库,直接使用。一站式广德网站制作公司更省心,省钱,快速模板网站建设找我们,业务覆盖广德地区。费用合理售后完善,10多年实体公司更值得信赖。
整体的Storn接口分为以下的几个class
1:MongoBolt.java
2 : MongoSpout.java
3 : MongoTailableCursorTopology.java
4 : SimpleMongoBolt.java
看代码说话:
1
package storm.mongo; import java.util.Map; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.base.BaseRichBolt; import backtype.storm.tuple.Tuple; import com.mongodb.DB; import com.mongodb.DBObject; import com.mongodb.MongoClient; import com.mongodb.MongoException; import com.mongodb.WriteConcern; /** * * 注意在这里,没有实现批处理的调用,并且只是一个抽象类,对于Mongo的Storm交互做了一次封装 * * @author Adrian Petrescu* */ public abstract class MongoBolt extends BaseRichBolt { private OutputCollector collector; // MOngDB的DB对象 private DB mongoDB; //记录我们的主机,端口,和MongoDB的数据DB民粹 private final String mongoHost; private final int mongoPort; private final String mongoDbName; /** * @param mongoHost The host on which Mongo is running. * @param mongoPort The port on which Mongo is running. * @param mongoDbName The Mongo database containing all collections being * written to. */ protected MongoBolt(String mongoHost, int mongoPort, String mongoDbName) { this.mongoHost = mongoHost; this.mongoPort = mongoPort; this.mongoDbName = mongoDbName; } @Override public void prepare( @SuppressWarnings("rawtypes") Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; try { //prepare方法目前在初始化的过程之中得到了一个Mongo的连接 this.mongoDB = new MongoClient(mongoHost, mongoPort).getDB(mongoDbName); } catch (Exception e) { throw new RuntimeException(e); } } @Override public void execute(Tuple input) { //注意我们在这里还有一个判断,判断当前是否该发射 if (shouldActOnInput(input)) { String collectionName = getMongoCollectionForInput(input); DBObject dbObject = getDBObjectForInput(input); if (dbObject != null) { try { mongoDB.getCollection(collectionName).save(dbObject, new WriteConcern(1)); collector.ack(input); } catch (MongoException me) { collector.fail(input); } } } else { collector.ack(input); } } /** * Decide whether or not this input tuple should trigger a Mongo write. * * @param input the input tuple under consideration * @return {@code true} iff this input tuple should trigger a Mongo write */ public abstract boolean shouldActOnInput(Tuple input); /** * Returns the Mongo collection which the input tuple should be written to. * * @param input the input tuple under consideration * @return the Mongo collection which the input tuple should be written to */ public abstract String getMongoCollectionForInput(Tuple input); /** * Returns the DBObject to store in Mongo for the specified input tuple. * 拿到DBObject的一个抽象类 * @param input the input tuple under consideration * @return the DBObject to be written to Mongo */ public abstract DBObject getDBObjectForInput(Tuple input); //注意这里随着计算的终结被关闭了。 @Override public void cleanup() { this.mongoDB.getMongo().close(); } }
2 :
package storm.mongo; import java.util.List; import java.util.Map; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichSpout; import backtype.storm.utils.Utils; import com.mongodb.BasicDBObject; import com.mongodb.Bytes; import com.mongodb.DB; import com.mongodb.DBCursor; import com.mongodb.DBObject; import com.mongodb.MongoClient; import com.mongodb.MongoException; /** * A Spout which consumes documents from a Mongodb tailable cursor. * * Subclasses should simply override two methods: *
* WARNING: You can only use tailable cursors on capped collections.
*
* @author Dan Beaulieu