java - 在 Apache Storm bolt中,java使用 Apache camel ProducerTemplate

  显示原文与译文双语对照的内容

我正在写简单的风暴+ Camel项目。 我的风暴拓扑分析了 tweet,一个螺栓应该向 Apache camel 路由发送微博文本,然后使用 web socket通知某些。

我无法使它工作,因为NotSerializableExceptions从 Bolts 收到,当尝试使用建立一次 CamelContext 。

我已经尝试过的内容:

  • 在螺栓的构造函数中传递 CamelContext - 结果导致 NotSerializableException
  • 在风暴特性中传递 CamelContext,并使用它在 prepare(...)的螺栓方法中进行gian访问。 结果:

    14484 [main] 错误 org.apache.storm.zookeeper.server.NIOServerCnxnFactory - 线程线程 [main,5,main] 已经终止 java.lang. IllegalArgumentException: 在 backtype.storm. testing$submit_local_topology中,拓扑conf不是json序列化程序。在 backtype.storm. localcluster$_submittopology处调用( 测试。clj: 262 ) ~[storm-core-0.9.4.jar:0.9.4] 。在 backtype.storm. LocalCluster.submitTopology(Unknown Source) xsr调用( LocalCluster 。clj: 43 ) ~[storm-core-0.9.4.jar:0.9.4]

Camel路线:


public class MyRouteBuilder extends RouteBuilder {


 @Override


 public void configure() throws Exception {


 from("direct:main")


. to("websocket:localhost:8085/main?sendToAll=true");


 }


}



Storm拓扑:使用 twitter4j API扩展Tweet的Tweet 。


public class TwitterStreamTopology {



 public static void main(String[] args) {


 CamelContext producerTemplate = new RouteStarter().buildRoute();



 TopologyBuilder builder = new TopologyBuilder();


 builder.setSpout("tweetSpout", new TweetSpout(keywords), 1);


 builder.setBolt("websocket", new WebSocketBolt()).shuffleGrouping("tweetSpout");


 Config conf = new Config();


 conf.put("producerTemplate", producerTemplate.createProducerTemplate());


 conf.setDebug(true);



 LocalCluster cluster = new LocalCluster();


 cluster.submitTopology("mytopology", conf, builder.createTopology());



 Utils.sleep(20000);


 cluster.shutdown();


 }


}



WebsocketBolt:


public class WebSocketBolt extends BaseBasicBolt {


 private ProducerTemplate producerTemplate;



 @Override


 public void execute(Tuple input, BasicOutputCollector basicOutputCollector) {


 Status s = (Status) input.getValueByField("tweet");


 producerTemplate.sendBody("direct:main", s.getText());


 }



 @Override


 public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {



 }



 @Override


 public void prepare(Map stormConf, TopologyContext context) {


 super.prepare(stormConf, context);


 this.producerTemplate = (ProducerTemplate) stormConf.get("producerTemplate");


 }


}



有方法能很好地做到这一点?

还是应该通过http访问Camel路由,并在螺栓 prepare(...) 方法中创建一些 HttpClient? 这看起来有点过时了,还有一种方法可以让它更容易。

感谢你的帮助 !

时间: 原作者:

问题的root 原因是你正在向风暴配置中添加 ProducerTemplate,它正在引发异常,因为它不可以序列化。 如果这是你自己的类,可以更改代码使它的工作,但是这是一个驼驼类,我推荐不同的方法。

  • WebSocketBolt: 将你的producerTemplate private 成员更改为临时成员: private transient ProducerTemplate producerTemplate; 这样它就不会试图被序列化为( 同样的问题你把它放到 conf ) 。
  • WebSocketBolt: 在准备方法内而不是在拓扑中初始化 producerTemplate 。

像这样:


public class WebSocketBolt extends BaseBasicBolt {


 private transient ProducerTemplate producerTemplate;



 @Override


 public void execute(Tuple input, BasicOutputCollector basicOutputCollector) {


 Status s = (Status) input.getValueByField("tweet");


 producerTemplate.sendBody("direct:main", s.getText());


 }



 @Override


 public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {



 }



 @Override


 public void prepare(Map stormConf, TopologyContext context) {


 super.prepare(stormConf, context);


 CamelContext producerTemplate = new RouteStarter().buildRoute();


 this.producerTemplate = producerTemplate.createProducerTemplate();


 }


}



...