如何为每个循环发出不同的元组并在风暴螺栓的单个字段方法中声明?

Posted

技术标签:

【中文标题】如何为每个循环发出不同的元组并在风暴螺栓的单个字段方法中声明?【英文标题】:How to emit tuple in diffrent for each loops and declare in single Fields method of storm bolt? 【发布时间】:2016-02-24 13:50:49 【问题描述】:

我正在从元组中获取所需的字符串。我的字符串附加了一个,因此我将它们拆分并在每个循环中发出。如果我编写一个发出(如果我只发出“id”),则螺栓显示没有例外,它工作正常,但是当我再添加一个 for-each 来拆分其他字符串并发出它们时,bolt 会抛出异常,如下所示

java.lang.IllegalArgumentException: Tuple created with wrong number of fields. Expected 2 fields but got 1 fields at backtype.storm.tuple.TupleImpl.<init>(TupleImpl.java:58) at backtype.storm.daemon.executor$fn__5694$fn__5707$bolt_emit__5736.invoke(executor.clj:739) at backtype.storm.daemon.executor$fn__5694$fn$reify__5742.emit(executor.clj:763) at backtype.storm.task.OutputCollector.emit(OutputCollector.java:203) at backtype.storm.task.OutputCollector.emit(OutputCollector.java:63) at backtype.storm.task.OutputCollector.emit(OutputCollector.java:101) at test.bolts.TInserts.execute(TInsert.java:264)

这是 Mybolt 代码

 public void execute(Tuple tuple) 
   try 
String screenname=tuple.getStringByField("s_name");
    String mentionname=tuple.getStringByField("n_name");
    String mentionid=tuple.getStringByField("n_id");

                 if(mentionid != null && !mentionid.isEmpty())
                     for(String id:mentionid.split(",")) 
                        id = id.trim();
                          this.collector.emit(new Values(id));
 this.collector.ack(tuple);
 //for close
 //if condtion for id
 if(mentionname != null && !mentionname.isEmpty())
                     for(String mns:mentionname.split(",")) 
                        mns = mns.trim();
                          this.collector.emit(new Values(mns));
this.collector.ack(tuple);
   //for close
 //if condtion for mentionname

   //if close
    
             catch (Exception e) 
           this.collector.reportError(e);
          this.collector.fail(tuple);
             
        


@Override
   public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) 

//   outputFieldsDeclarer.declare(new Fields("id","mns"));//doesn't throw any   exception if i emit only id(1st for)                     
    outputFieldsDeclarer.declare(new Fields("id","mns"));//thows exception 


如何在单次发出方法中发出 id 和 mns?我认为这不会引发任何错误或异常。有没有办法这样做?

this.collector.emit(new Values(id,mns));

【问题讨论】:

【参考方案1】:

如错误消息所示,输出元组应包含两个字段,但您发出一个包含单个字段的元组。

Storm 需要一个具有两个输出字段的元组,因为您在 declareOutputFields(...) 中将输出模式声明为 new Fields("id","mms"))

您实际上已经自己回答了您的问题。发出 2 字段元组的正确方法是

this.collector.emit(new Values(id,mns));

编辑:

您需要同时遍历这两个标记化字段...像这样:

String mentionname = tuple.getStringByField("n_name");
String mentionid = tuple.getStringByField("n_id");

String[] idTokens = null;
if(mentionid != null) 
    idTokens = mentionid.split(",");

String[] nameTokens = null;
if(mentionname != null)
    nameTokens = mentionname.split(",");


for(int i = 0, j = 0; i < idTokens.length && j < nameTokens.lenght; ++i, ++j) 
    this.collector.emit(new Values(idTokens[i].trim(), nameTokens[j].trim()));

this.collector.ack(tuple);

【讨论】:

但是我可以为每个循环发出的值只是 id。 for(String id:mentionid.split(",")) id = id.trim(); this.collector.emit(新值(id)); this.collector.ack(tuple); 收到错误消息:找不到符号 [ERROR] 符号:变量 msn。 我不知道您是如何更改代码的,但这听起来像是与 Storm 无关的常规 Java 编程错误...如果您是 Java 编程的初学者,您应该通过 Java教科书并开始编写更简单的代码。使用 Storm 很复杂,需要一些高级知识。 我不是要你更改我的代码!只是给我一个在单个 for 循环中发出字符串的想法 :) 感谢您的建议。 我一定会的! :) @Matthias J.萨克斯

以上是关于如何为每个循环发出不同的元组并在风暴螺栓的单个字段方法中声明?的主要内容,如果未能解决你的问题,请参考以下文章

Python:元组列表:比较所有元组并检索元组的元素不等于任何其他元组的元组

在元组的ndarray中查找元组并返回搜索到的元组的索引

从 Sqlite 表中选择行的元组并有效地对元组进行排序

遍历 pandas 数据框中的行并匹配列表中的元组并创建一个新的 df 列

PySpark UDF 返回可变大小的元组

如何从python中的列表中删除重复的元组?