流式计算之Storm_wordcount例子 storm wordcount 实例

一.构建maven开发环境为了开发storm topology, 你需要把storm相关的jar包添加到classpath里面去:要么手动添加所有相关的jar包, 要么使用maven来管理所有的依赖。storm的jar包发布在Clojars(一个maven库),如果你使用maven的话,把下面的配置添加在你项目的pom.xml里面。
<repository> <id>clojars.org</id> <url>http://clojars.org/repo</url></repository>
<dependency> <groupId>storm</groupId> <artifactId>storm</artifactId> <version>0.5.3</version> <scope>test</scope></dependency>
二.代码范例
1.Topology 入口点 RollingTopWords------------类似于hadoop的Job定义本地模式(嵌入Local):
package storm.starter;import backtype.storm.Config;import backtype.storm.LocalCluster;import backtype.storm.testing.TestWordSpout;import backtype.storm.topology.TopologyBuilder;import backtype.storm.tuple.Fields;import storm.starter.bolt.MergeObjects;import storm.starter.bolt.RankObjects;import storm.starter.bolt.RollingCountObjects;public class RollingTopWords { public static voidmain(String[] args) throws Exception { final int TOP_N = 3; TopologyBuilder builder = newTopologyBuilder(); builder.setSpout(1, new TestWordSpout(),5); builder.setBolt(2, new RollingCountObjects(60,10), 4) .fieldsGrouping(1, new Fields("word")); builder.setBolt(3, new RankObjects(TOP_N),4) .fieldsGrouping(2, new Fields("obj")); builder.setBolt(4, newMergeObjects(TOP_N)) .globalGrouping(3); Config conf = new Config(); conf.setDebug(true); LocalCluster cluster = new LocalCluster(); //本地模式启动集群 cluster.submitTopology("rolling-demo", conf,builder.createTopology()); Thread.sleep(10000); cluster.shutdown(); }}

部署模式:
package storm.starter;import storm.starter.bolt.MergeObjects;import storm.starter.bolt.RankObjects;import storm.starter.bolt.RollingCountObjects;import backtype.storm.Config;import backtype.storm.StormSubmitter;import backtype.storm.testing.TestWordSpout;import backtype.storm.topology.TopologyBuilder;import backtype.storm.tuple.Fields;public class RollingTopWords { public static voidmain(String[] args) throws Exception { final int TOP_N = 3; TopologyBuilder builder = newTopologyBuilder(); builder.setSpout(1, new TestWordSpout(),5); builder.setBolt(2, new RollingCountObjects(60,10), 4).fieldsGrouping( 1, new Fields("word")); builder.setBolt(3, new RankObjects(TOP_N),4).fieldsGrouping(2, new Fields("obj")); builder.setBolt(4, newMergeObjects(TOP_N)).globalGrouping(3); Config conf = new Config(); conf.setDebug(true); conf.setNumWorkers(20); conf.setMaxSpoutPending(5000); StormSubmitter.submitTopology("demo",conf, builder.createTopology()); Thread.sleep(10000); }}

2. 直接使用内置的TestWordSpout(随机产生一个word)TestWordSpout
package backtype.storm.testing;import backtype.storm.topology.OutputFieldsDeclarer;import java.util.Map;import backtype.storm.spout.SpoutOutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.IRichSpout;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Values;import backtype.storm.utils.Utils;import java.util.Random;import org.apache.log4j.Logger;public class TestWordSpout implements IRichSpout { public static Logger LOG= Logger.getLogger(TestWordSpout.class); boolean_isDistributed; SpoutOutputCollector_collector; public TestWordSpout(){ this(true); } publicTestWordSpout(boolean isDistributed) { _isDistributed = isDistributed; } public booleanisDistributed() { return _isDistributed; } public void open(Mapconf, TopologyContext context, SpoutOutputCollector collector){ _collector = collector; } public void close(){ } public void nextTuple(){ Utils.sleep(100); final String[] words = new String[] {"nathan","mike", "jackson", "golda", "bertels"}; final Random rand = new Random(); final String word =words[rand.nextInt(words.length)]; _collector.emit(new Values(word)); } public void ack(ObjectmsgId) { } public void fail(ObjectmsgId) { } public voiddeclareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); }}

3.各环节处理Bolt
RollingCountObjects 滚动计数word,并通过定时触发时间,清空计数列表
package storm.starter.bolt;import backtype.storm.task.OutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.IRichBolt;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Tuple;import backtype.storm.tuple.Values;import backtype.storm.utils.Utils;import java.util.HashMap;import java.util.HashSet;import java.util.Map;import java.util.Set;@SuppressWarnings("serial")public class RollingCountObjects implements IRichBolt { privateHashMap<Object, long[]> _objectCounts= new HashMap<Object,long[]>(); private int_numBuckets; private transient Threadcleaner; private OutputCollector_collector; private int_trackMinutes; publicRollingCountObjects(int numBuckets, int trackMinutes) { _numBuckets = numBuckets; _trackMinutes = trackMinutes; } public long totalObjects(Object obj) { long[] curr = _objectCounts.get(obj); long total = 0; for (long l: curr) { total+=l; } return total; } public int currentBucket(int buckets) { return (currentSecond() /secondsPerBucket(buckets)) % buckets; } public intcurrentSecond() { return (int) (System.currentTimeMillis() /1000); } public intsecondsPerBucket(int buckets) { return (_trackMinutes * 60 / buckets); } public longmillisPerBucket(int buckets) { return (long) secondsPerBucket(buckets) *1000; } @SuppressWarnings("rawtypes") public void prepare(MapstormConf, TopologyContext context, OutputCollector collector){ _collector = collector; cleaner = new Thread(new Runnable() { @SuppressWarnings("unchecked") publicvoid run() { Integer lastBucket =currentBucket(_numBuckets); while(true) { intcurrBucket = currentBucket(_numBuckets); if(currBucket!=lastBucket) { int bucketToWipe =(currBucket + 1) % _numBuckets; synchronized(_objectCounts){ Set objs = newHashSet(_objectCounts.keySet()); for (Object obj: objs) { long[] counts =_objectCounts.get(obj); long currBucketVal =counts[bucketToWipe]; counts[bucketToWipe] = 0; //*这行代码很关键* long total =totalObjects(obj); if(currBucketVal!=0) { _collector.emit(new Values(obj, total)); } if(total==0) { _objectCounts.remove(obj); } } } lastBucket =currBucket; } long delta= millisPerBucket(_numBuckets) - (System.currentTimeMillis() %millisPerBucket(_numBuckets)); Utils.sleep(delta); } } }); cleaner.start(); } public voidexecute(Tuple tuple) { Object obj = tuple.getValue(0); int bucket = currentBucket(_numBuckets); synchronized(_objectCounts) { long[]curr = _objectCounts.get(obj); if(curr==null) { curr = newlong[_numBuckets]; _objectCounts.put(obj,curr); } curr[bucket]++; _collector.emit(new Values(obj, totalObjects(obj))); _collector.ack(tuple); } } public void cleanup(){ } public voiddeclareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("obj","count")); }}

RankObjects
package storm.starter.bolt;import backtype.storm.task.TopologyContext;import backtype.storm.topology.BasicOutputCollector;import backtype.storm.topology.IBasicBolt;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Tuple;import backtype.storm.tuple.Values;import java.util.ArrayList;import java.util.Collections;import java.util.Comparator;import java.util.List;import java.util.Map;import org.json.simple.JSONValue;@SuppressWarnings("serial")public class RankObjects implements IBasicBolt { @SuppressWarnings("rawtypes") List<List> _rankings = newArrayList<List>(); int _count; Long _lastTime =null; public RankObjects(intn) { _count = n; } @SuppressWarnings("rawtypes") private int_compare(List one, List two) { long valueOne = (Long) one.get(1); long valueTwo = (Long) two.get(1); long delta = valueTwo - valueOne; if(delta > 0) { return1; } else if (delta < 0) { return-1; } else { return0; } } //end compare private Integer_find(Object tag) { for(int i = 0; i <_rankings.size(); ++i) { Object cur= _rankings.get(i).get(0); if(cur.equals(tag)) { return i; } } return null; } @SuppressWarnings("rawtypes") public void prepare(MapstormConf, TopologyContext context) { } @SuppressWarnings("rawtypes") public voidexecute(Tuple tuple, BasicOutputCollector collector) { Object tag = tuple.getValue(0); Integer existingIndex = _find(tag); if (null != existingIndex) { _rankings.set(existingIndex, tuple.getValues()); } else { _rankings.add(tuple.getValues()); } Collections.sort(_rankings, newComparator<List>() { public intcompare(List o1, List o2) { return _compare(o1,o2); } }); if (_rankings.size() > _count){ _rankings.remove(_count); } long currentTime =System.currentTimeMillis(); if(_lastTime==null || currentTime>= _lastTime + 2000) { collector.emit(newValues(JSONValue.toJSONString(_rankings))); _lastTime= currentTime; } } public void cleanup(){ } public voiddeclareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("list")); }}
MergeObjects 对排序结果进行归并
package storm.starter.bolt;import org.apache.log4j.Logger;import backtype.storm.task.TopologyContext;import backtype.storm.topology.BasicOutputCollector;import backtype.storm.topology.IBasicBolt;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Tuple;import backtype.storm.tuple.Values;import java.util.ArrayList;import java.util.Collections;import java.util.Comparator;import java.util.List;import java.util.Map;import org.json.simple.JSONValue;@SuppressWarnings("serial")public class MergeObjects implements IBasicBolt { public static Logger LOG= Logger.getLogger(MergeObjects.class); @SuppressWarnings({"rawtypes", "unchecked" }) privateList<List> _rankings = newArrayList(); int _count = 10; Long _lastTime; public MergeObjects(intn) { _count = n; } @SuppressWarnings("rawtypes") private int_compare(List one, List two) { long valueOne = (Long) one.get(1); long valueTwo = (Long) two.get(1); long delta = valueTwo - valueOne; if(delta > 0) { return1; } else if (delta < 0) { return-1; } else { return0; } } //end compare private Integer_find(Object tag) { for(int i = 0; i <_rankings.size(); ++i) { Object cur= _rankings.get(i).get(0); if(cur.equals(tag)) { return i; } } return null; } @SuppressWarnings("rawtypes") public void prepare(MapstormConf, TopologyContext context) { } @SuppressWarnings({"unchecked", "rawtypes" }) public voidexecute(Tuple tuple, BasicOutputCollector collector) { List<List> merging= (List) JSONValue.parse(tuple.getString(0)); for(List pair : merging) { IntegerexistingIndex = _find(pair.get(0)); if (null!= existingIndex) { _rankings.set(existingIndex,pair); } else{ _rankings.add(pair); } Collections.sort(_rankings, newComparator<List>() { public int compare(List o1,List o2) { return _compare(o1, o2); } }); if(_rankings.size() > _count) { _rankings.subList(_count,_rankings.size()).clear(); } } long currentTime =System.currentTimeMillis(); if(_lastTime==null || currentTime>= _lastTime + 2000) { StringfullRankings = JSONValue.toJSONString(_rankings); collector.emit(new Values(fullRankings)); LOG.info("Rankings: " + fullRankings); _lastTime= currentTime; } } public void cleanup(){ } public voiddeclareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("list")); }}

  

爱华网本文地址 » http://www.413yy.cn/a/25101010/27243.html

更多阅读

金钱草反流性胃炎之克星 胆汁反流胃炎能活多久

说起金钱草很多老百姓都知道是一味治疗胆结石的良药,可以称为妇孺皆知。但是,金钱草能治疗胆汁反流性胃炎的人可能知道的不多,而且方法极简单,特效。对于基层医生和一般稍懂些医学知识的人可以说是一种方便易行的好方法。临床上胆汁反流

喝粤式早茶必点的点心---虾饺 广式点心之家虾饺

喝粤式早茶必点的点心---虾饺——简介特别喜欢喝粤式早茶,因为它的种类,也许是全国各个地区里最丰富多彩的,不下百种,而且样样精致,让你忍不住想尝尝。但是又恐怕是有心而无力,品种实在是太多啦。这其中有我最钟爱就是这虾饺,每次喝茶必点

流式细胞仪的功能和临床应用 流式细胞仪的临床应用

流式细胞仪最大的贡献在于促进了免疫学基础研究和临床诊断。目前流式细胞术已被广泛用于细胞生物学、免疫学、肿瘤学、血液学、病理学、遗传学和临床检验等多学科领域的基础和临床研究。1.流式细胞仪的功能(1)细胞参量分析包括细胞大小

声明:《流式计算之Storm_wordcount例子 storm wordcount 实例》为网友怪怪怪咖分享!如侵犯到您的合法权益请联系我们删除