为 Apache Storm 上的 Bolt 添加重试机制

Posted

技术标签:

【中文标题】为 Apache Storm 上的 Bolt 添加重试机制【英文标题】:Add retry mechanism for bolt on Apache Storm 【发布时间】:2014-12-14 13:43:49 【问题描述】:

我的风暴拓扑中有一个螺栓(调度程序),它打开 http 请求连接。

我想在失败的情况下添加重试机制(连接超时、失败状态等)。 重试应该只发生在 dispatcher-bolt 中,而不是从整个拓扑开始。

通常我会添加一个负责重试和异常处理的队列(例如在 3 次后自动将消息分派到错误队列..)

可以在螺栓内部做这​​样的事情吗?有人有这方面的经验,可以建议我可以使用哪个库吗?

【问题讨论】:

【参考方案1】:

当然!这似乎是一种处理错误的合理方法。除了提供用于连接到您选择的排队系统的 API 的库之外,我不确定您需要使用哪个库。

在螺栓内部,您可能有如下代码:

@Override
public void execute(Tuple tuple, BasicOutputCollector collector) 
   try 
      // do something which might fail here...
    catch (Exception e) 
      // do you want to log the error?
      LOG.error("Bolt error ", e);
      // do you want the error to show up in storm UI?
      collector.reportError(e);
      // or just put information on the queue for processing later
   

只要您在 bolt 内捕获异常,您的拓扑就不会重新启动。

另一种选择是利用Storm's built-in ability for guaranteed message processing 使元组失败并以这种方式重试它们。

【讨论】:

你能建议我用一个轻量级的重试库来处理这些吗? 我不认为有一个库可以为你做这件事......你需要自己编写代码。您是否使用队列(ActiveMQ、RabbitMQ 等)? 我还没决定。我认为这些对于我的目的来说太过分了。【参考方案2】:
package banktransactions;

import java.util.HashMap;
import java.util.Map;
import java.util.Random;

import org.apache.log4j.Logger;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

public class TransactionsSpouts extends BaseRichSpout

private static final Integer MAX_FAILS = 2;
Map<Integer,String> messages;
Map<Integer,Integer> transactionFailureCount;
Map<Integer,String> toSend;
private SpoutOutputCollector collector;  

static Logger LOG = Logger.getLogger(TransactionsSpouts.class);


public void ack(Object msgId) 
    messages.remove(msgId);
    LOG.info("Message fully processed ["+msgId+"]");


public void close() 



public void fail(Object msgId) 
    if(!transactionFailureCount.containsKey(msgId))
        throw new RuntimeException("Error, transaction id not found ["+msgId+"]");
    Integer transactionId = (Integer) msgId;

    //Get the transactions fail
    Integer failures = transactionFailureCount.get(transactionId) + 1;
    if(failures >= MAX_FAILS)
        //If exceeds the max fails will go down the topology
        throw new RuntimeException("Error, transaction id ["+transactionId+"] has had many errors ["+failures+"]");
    
    //If not exceeds the max fails we save the new fails quantity and re-send the message 
    transactionFailureCount.put(transactionId, failures);
    toSend.put(transactionId,messages.get(transactionId));
    LOG.info("Re-sending message ["+msgId+"]");


public void nextTuple() 
    if(!toSend.isEmpty())
        for(Map.Entry<Integer, String> transactionEntry : toSend.entrySet())
            Integer transactionId = transactionEntry.getKey();
            String transactionMessage = transactionEntry.getValue();
            collector.emit(new Values(transactionMessage),transactionId);
        
        /*
         * The nextTuple, ack and fail methods run in the same loop, so
         * we can considerate the clear method atomic
         */
        toSend.clear();
    
    try 
        Thread.sleep(1);
     catch (InterruptedException e) 


public void open(Map conf, TopologyContext context,
        SpoutOutputCollector collector) 
    Random random = new Random();
    messages = new HashMap<Integer, String>();
    toSend = new HashMap<Integer, String>();
    transactionFailureCount = new HashMap<Integer, Integer>();
    for(int i = 0; i< 100; i++)
        messages.put(i, "transaction_"+random.nextInt());
        transactionFailureCount.put(i, 0);
    
    toSend.putAll(messages);
    this.collector = collector;


public void declareOutputFields(OutputFieldsDeclarer declarer) 
    declarer.declare(new Fields("transactionMessage"));

【讨论】:

以上是关于为 Apache Storm 上的 Bolt 添加重试机制的主要内容,如果未能解决你的问题,请参考以下文章

Apache-Storm 集群

在storm中,我可以指定一个bolt将运行的工人数量吗?

Apache Storm 工作人员之间的高消息传递延迟

Apache Storm:一个本地执行任务的Demo

storm-jdbc的使用

Storm 系列—— Storm 编程模型详解