博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
storm的定时任务
阅读量:4984 次
发布时间:2019-06-12

本文共 8175 字,大约阅读时间需要 27 分钟。

 

 

应用场景:

 

第一种方法

 

 参考代码StormTopologyTimer1.java

package yehua.storm;import java.util.Map;import org.apache.storm.Config;import org.apache.storm.Constants;import org.apache.storm.LocalCluster;import org.apache.storm.StormSubmitter;import org.apache.storm.generated.AlreadyAliveException;import org.apache.storm.generated.AuthorizationException;import org.apache.storm.generated.InvalidTopologyException;import org.apache.storm.spout.SpoutOutputCollector;import org.apache.storm.task.OutputCollector;import org.apache.storm.task.TopologyContext;import org.apache.storm.topology.OutputFieldsDeclarer;import org.apache.storm.topology.TopologyBuilder;import org.apache.storm.topology.base.BaseRichBolt;import org.apache.storm.topology.base.BaseRichSpout;import org.apache.storm.tuple.Fields;import org.apache.storm.tuple.Tuple;import org.apache.storm.tuple.Values;import org.apache.storm.utils.Utils;public class StormTopologyTimer1 {        public static class MySpout extends BaseRichSpout{        private Map conf;        private TopologyContext context;        private SpoutOutputCollector collector;        @Override        public void open(Map conf, TopologyContext context,                SpoutOutputCollector collector) {            this.conf = conf;            this.collector = collector;            this.context = context;        }        int num = 0;         @Override        public void nextTuple() {            num++;            System.out.println("spout:"+num);            this.collector.emit(new Values(num));            Utils.sleep(1000);        }        @Override        public void declareOutputFields(OutputFieldsDeclarer declarer) {            declarer.declare(new Fields("num"));        }            }                public static class MyBolt extends BaseRichBolt{                private Map stormConf;        private TopologyContext context;        private OutputCollector collector;        @Override        public void prepare(Map stormConf, TopologyContext context,                OutputCollector collector) {            this.stormConf = stormConf;            this.context = context;            this.collector = collector;        }                int sum = 0;        @Override        public void execute(Tuple input) {            if(input.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID)){                System.out.println("定时时间到了");            }else{                Integer num = input.getIntegerByField("num");                sum += num;                System.out.println("sum="+sum);            }        }        @Override        public void declareOutputFields(OutputFieldsDeclarer declarer) {                    }            }                public static void main(String[] args) {        TopologyBuilder topologyBuilder = new TopologyBuilder();        String spout_id = MySpout.class.getSimpleName();        String bolt_id = MyBolt.class.getSimpleName();                topologyBuilder.setSpout(spout_id, new MySpout());        topologyBuilder.setBolt(bolt_id, new MyBolt()).shuffleGrouping(spout_id);                        Config config = new Config();        //设置定时任务        config.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 10);//表示storm每隔10秒都会给topology里面的所有bolt发送一个系统级别的tuple        String topology_name = StormTopologyTimer1.class.getSimpleName();        if(args.length==0){            //在本地运行            LocalCluster localCluster = new LocalCluster();            localCluster.submitTopology(topology_name, config, topologyBuilder.createTopology());        }else{            //在集群运行            try {                StormSubmitter.submitTopology(topology_name, config, topologyBuilder.createTopology());            } catch (AlreadyAliveException e) {                e.printStackTrace();            } catch (InvalidTopologyException e) {                e.printStackTrace();            } catch (AuthorizationException e) {                e.printStackTrace();            }        }            }}

 

 方法二:

 

所以我们应该这么干

这个方法也ok的,这个不同于前面的在main里设置定时,这个是在bolt里设置,这个方法就是针对bolt设置,针对某一个bolt设置定时任务!

 

参考代码StormTopologyTimer2.java

package yehua.storm;import java.util.HashMap;import java.util.Map;import org.apache.storm.Config;import org.apache.storm.Constants;import org.apache.storm.LocalCluster;import org.apache.storm.StormSubmitter;import org.apache.storm.generated.AlreadyAliveException;import org.apache.storm.generated.AuthorizationException;import org.apache.storm.generated.InvalidTopologyException;import org.apache.storm.spout.SpoutOutputCollector;import org.apache.storm.task.OutputCollector;import org.apache.storm.task.TopologyContext;import org.apache.storm.topology.OutputFieldsDeclarer;import org.apache.storm.topology.TopologyBuilder;import org.apache.storm.topology.base.BaseRichBolt;import org.apache.storm.topology.base.BaseRichSpout;import org.apache.storm.tuple.Fields;import org.apache.storm.tuple.Tuple;import org.apache.storm.tuple.Values;import org.apache.storm.utils.Utils;public class StormTopologyTimer2 {        public static class MySpout extends BaseRichSpout{        private Map conf;        private TopologyContext context;        private SpoutOutputCollector collector;        @Override        public void open(Map conf, TopologyContext context,                SpoutOutputCollector collector) {            this.conf = conf;            this.collector = collector;            this.context = context;        }        int num = 0;         @Override        public void nextTuple() {            num++;            System.out.println("spout:"+num);            this.collector.emit(new Values(num));            Utils.sleep(1000);        }        @Override        public void declareOutputFields(OutputFieldsDeclarer declarer) {            declarer.declare(new Fields("num"));        }            }                public static class MyBolt extends BaseRichBolt{                private Map stormConf;        private TopologyContext context;        private OutputCollector collector;        @Override        public void prepare(Map stormConf, TopologyContext context,                OutputCollector collector) {            this.stormConf = stormConf;            this.context = context;            this.collector = collector;        }                int sum = 0;        @Override        public void execute(Tuple input) {            if(input.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID)){                System.out.println("定时时间到了");            }else{                Integer num = input.getIntegerByField("num");                sum += num;                System.out.println("sum="+sum);            }        }        @Override        public void declareOutputFields(OutputFieldsDeclarer declarer) {                    }        @Override        public Map
getComponentConfiguration() { HashMap
hashMap = new HashMap
(); hashMap.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 10);//给当前bolt设置定时任务 return hashMap; } } public static void main(String[] args) { TopologyBuilder topologyBuilder = new TopologyBuilder(); String spout_id = MySpout.class.getSimpleName(); String bolt_id = MyBolt.class.getSimpleName(); topologyBuilder.setSpout(spout_id, new MySpout()); topologyBuilder.setBolt(bolt_id, new MyBolt()).shuffleGrouping(spout_id); Config config = new Config(); String topology_name = StormTopologyTimer2.class.getSimpleName(); if(args.length==0){ //在本地运行 LocalCluster localCluster = new LocalCluster(); localCluster.submitTopology(topology_name, config, topologyBuilder.createTopology()); }else{ //在集群运行 try { StormSubmitter.submitTopology(topology_name, config, topologyBuilder.createTopology()); } catch (AlreadyAliveException e) { e.printStackTrace(); } catch (InvalidTopologyException e) { e.printStackTrace(); } catch (AuthorizationException e) { e.printStackTrace(); } } }}

 

转载于:https://www.cnblogs.com/braveym/p/6965321.html

你可能感兴趣的文章
297.白盒测试
查看>>
新闻客户端的突破与创新
查看>>
网络通信引擎ICE的使用
查看>>
js滚动事件实现滚动触底加载
查看>>
CetnOS minimal 网络不可用
查看>>
MySQL 数据库备份
查看>>
python 笔记
查看>>
【Java】NIO中Channel的注册源码分析
查看>>
JS监测鼠标指针位置
查看>>
Mac常用终端命令
查看>>
团队作业2
查看>>
Gym - 101350A Sherlock Bones(思维)
查看>>
莫队算法板子
查看>>
Tensor flow 实战Google深度学习框架 笔记摘要Ptwo
查看>>
rest_framework之渲染器
查看>>
有状态服务和无状态服务
查看>>
iOS:检测多媒体(相机、相册、麦克风)设备权限,弹框提示
查看>>
Linux 下修改配置实现在当前目录下寻找可执行文件
查看>>
css3 appearance在iphone上面的问题
查看>>
Linux常用命令(第二版) --权限管理命令
查看>>