Storm系列二: Storm拓扑设计

Posted zyzdisciple

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Storm系列二: Storm拓扑设计相关的知识,希望对你有一定的参考价值。

Storm系列二: Storm拓扑设计

在本篇中,我们就来根据一个案例,看看如何去设计一个拓扑, 如何分解问题以适应Storm架构,同时对Storm拓扑内部的并行机制会有一个基本的了解。

本章代码都在:

[email protected]:zyzdisciple/storm_study.git

项目下的 user_behavior包下。

问题案例

有这样一种场景,在前端存在会话,我们会不断收到来自前端的消息,消息包含消息的发送时间,消息内容,结束标识, 消息的发送者, SessionId等其他信息, 我们需要做的事情是当接收到消息之后,根据SessionId判断是否属于同一消息, 如果是的话将内容拼接, 如果结束标识为 true, 表示会话已结束,则存入数据库或其他地方, 如果不为true, 则等待, 在1分钟后 还是没有收到消息, 则存入数据库。

我们对消息数据并没有太严格的要求,因为数据本身是用户的行为, 如一个点击,浏览事件等等其他行为信息,在这里我们用三位数的数值,来表示用户的行为类型。这就是案例的基本需求。

问题分析

数据源

数据的输入格式为:

{sessionId:1, content:1,isEnd:true, timeMillis:1, userId:1}

这些基本的信息中,就包含着某用户在某时间段的行为,而我们事实上需要做的就是,了解用户在一段时间内的操作行为(在这里过期时间是1分钟,指的是系统自身的过期时间,并非用户的操作间隔),将其存入数据库,供给其他的系统使用。

我们需要关注的核心点在以下几个地方:

  • 数据具有时效性,因此我们必然需要尽可能快的推入数据库,否则的话用Hadoop或Spark做批处理比Storm更合适。 所以,速度对我们而言,很重要。

  • 可靠数据源,不可靠数据源, 这同样也是需要考虑的一点,对于可靠数据源而言,我们必须保证数据不丢失,当拓扑挂掉之后,或是有些数据遗失之后,能够从数据源的节点再次获取。 但我们这里的是不可靠数据源,因为用户的几条数据丢失,对我们而言并没有太大差别。 所以在趋势性预测方面, 数据的可靠性并没有那么重要。

所以需要把握数据源的相关特性,与业务相对应, 做出速度, 安全等其他方面的取舍。

数据最终形式

这也是我们在一开始就需要关注的另一个核心点,如下:

{sessionId:1,content:001,002,005, isEnd:true, endTime:2, userId:1}

sessionId,userId不用多说, content需要将对应的标识转换成三位数,然后拼接存储,以“,”间隔, isEnd表示收到最后一条数据时的 end标识, endTime,表示收到最后一条数据的时间。

小结

在拓扑设计中,处理逻辑本身并不是第一件该关注的地方, 无论将来是存入数据又或是其他方式,需要了解数据源的特性,对数据的时效性安全性,性能等方面的考虑,以及将来的扩展性,可能需要优化的地方都需要放在考虑范围内。

尽管有一种说法是,不要过早优化。但那种优化,细节并不是性能的决定因素,而大数据,任何一点细节都可能会导致性能的巨大偏差, 重构设计又是一件很难的事情。 因此提早考虑优化并不是没有必要的事情。

拓扑设计

在拓扑设计时,需要:

  1. 确定输入数据,怎么把它表示成元组

  2. 确定输出重点数据,怎么表示成元组

  3. 确定中间部分,补充完善数据处理方式。

所以在开始代码之前,最好已经确定了每一步究竟该干什么。

  1. 我们接收到的是字符串,又需要保存各个数据的相应属性,因此在一开始的Spout中,需要将对应的数据转换成Java对象,下发下去,同时如果传入数据有误,则直接跳过,不进行处理。另外一点需要做的是, 将content转成3位有效数字。

    在Spout中主要工作就是对接数据源,同时它自身也是整个系统的数据源,需要做相应的数据处理,而不包含其他业务逻辑。

  2. 当接收到对象之后,需要按照时间间隔进行分组,提取出当前数据所在的时间间隔下发到下一层级即可。

  3. 按时间收取到数据之后,再进行处理,按照之前所提到的要求进一步处理数据。

  4. 写出。

初步实现

在其中的 bolt及Spout中需要注意的一点问题是, 所有的可能抛出异常的地方都要被处理掉, 即使是 RuntimeException, 原因稍后会提到。

Spout

spout的核心功能就是读取数据,对数据进行基本的数据处理,作为拓扑的数据源,下发数据。

那么作为数据源,需要做的工作有哪些?在平时做接口的过程中,我们向前端返回的数据一定是具有一定的数据格式, 且不容变更的格式,能够提供前端所需的所有信息才行, 最好数据本身已经经过一定的处理, 对待错误数据等其他问题,并不直接向前端返回数据。

所以数据的基本处理, 校验, 转换成对象才是基本需求。

同样,为了简化处理, 这里采取了从文件中读取数据的方式。

代码如下:

import com.google.gson.Gson;
import com.storm.demo.user_behavior.BehaviorConstants;
import com.storm.demo.user_behavior.entity.MessageInfo;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.math.NumberUtils;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.util.Map;

/**
* @author zyzdisciple
* @date 2019/4/7
*/
public class FileReaderSpout extends BaseRichSpout {

    private static final long serialVersionUID = 2970891499685374549L;

    private static final Logger logger = LoggerFactory.getLogger(FileReaderSpout.class);

    private static Gson gson;

    private BufferedReader br;

    private SpoutOutputCollector collector;

    @Override
    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        this.collector = collector;
        gson = new Gson();
        try {
            br = new BufferedReader(new FileReader("E:\IdeaProjects\storm_demo\src\main\resources\user_behavior_data.txt"));
        } catch (FileNotFoundException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void nextTuple() {
        try {
            String line = br.readLine();
            if (line != null && !line.isEmpty()) {
                line = line.trim();
                MessageInfo info = gson.fromJson(line, MessageInfo.class);
                //只发送有效数据,无效数据跳过
                /*在这里, 我们是逐行发送, 如果有条件的话, 也就是说一次能取出
                * 多条数据的情况下, 全部一次发送是比较好的选择.
                * */
                if (isValid(info, line)) {
                    completingMessage(info);
                    collector.emit(new Values(info));
                }
            }
        } catch (Exception e) {
            /*在catch语句中,我们一般会选择打印log日志,表示为什么出错,并不做其他处理,
            * 即使数据格式存在问题依然需要能够正常执行下去,保证拓扑不中断。
            * 在这里可以进一步将Exception细分,对不同的Exception打印日志不同。
            * */
            e.printStackTrace();
        }
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields(BehaviorConstants.FIELD_INFO));
    }

    @Override
    public void close() {
        if (br != null) {
            try {
                br.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    /**
    * 补全基本数据, 设置默认值
    * @param info
    */
    private void completingMessage(MessageInfo info) {
        try {
            info.setContent(formatMessage(info.getContent()));
        } catch (Exception e) {
            logger.warn("内容转换失败:" + info);
        }
        if (info.getEndTime() == null) {
            info.setEndTime(System.currentTimeMillis());
        }

    }

    /**
    * 格式化内容,将content转为3位
    * @param content
    * @return
    */
    private String formatMessage(String content) {

        if (NumberUtils.isCreatable(content)) {
            StringBuilder sb = new StringBuilder(BehaviorConstants.CONTENT_CAPACITY);
            //从第一位不是0的数值开始
            boolean skipZero = false;
            int currentValue;
            try {
                for (String c : content.trim().split("")) {
                    //已经跳过数值为0的字符, 如果不为0将 skipZero设为true
                    //如果是+ - x等字符, 会抛出异常
                    currentValue = Integer.valueOf(c);
                    if (skipZero || (currentValue != 0 && (skipZero = true))) {
                        sb.append(currentValue);
                    }
                }
            } catch (Exception e) {
                throw new NumberFormatException();
            }
            //补全字符串
            StringBuilder result = new StringBuilder(BehaviorConstants.CONTENT_CAPACITY);
            for (int i = 0, L = sb.length(); i < BehaviorConstants.CONTENT_CAPACITY - L; i++) {
                result.append(0);
            }
            result.append(sb);
            return result.toString();
        } else {
            return BehaviorConstants.UNKNOWN_CONTENT;
        }
    }

    /**
    * message校验
    * @return
    */
    private boolean isValid(MessageInfo info, String source) {
        if (info == null) {
            logger.warn("数据格式转换失败, 字符串为:" + source);
        } else if (StringUtils.isBlank(info.getSessionId())) {
            logger.warn("缺失sessionId, 字符串为:" + source);
        } else if (StringUtils.isBlank(info.getUserId())) {
            logger.warn("缺失userId, 字符串为:" + source);
        } else {
            return true;
        }
        return false;
    }
}

代码并没有什么太多值得注意的地方, 更多的则是数据前期处理方面。

TimeIntervalBolt

主要功能是划分数据时间组, 为了测试起见, 可以将timeOut时间进一步调小.

import com.storm.demo.user_behavior.BehaviorConstants;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

import java.util.Map;

/**
* @author zyzdisciple
* @date 2019/4/7
*/
public class TimeIntervalBolt extends BaseRichBolt {

    private static final long serialVersionUID = 5632264737187544663L;

    private OutputCollector collector;

    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        this.collector = collector;
    }

    /**
    * 是按照接收到的时间来进行分组.
    * @param input
    */
    @Override
    public void execute(Tuple input) {
        //判断当前的时间区间.
        long timeGroup = System.currentTimeMillis() / (BehaviorConstants.SESSION_TIME_OUT_SECS * 1000);
        collector.emit(new Values(timeGroup, input.getValueByField(BehaviorConstants.FIELD_INFO)));
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields(BehaviorConstants.FIELD_TIME_GROUP, BehaviorConstants.FIELD_INFO));
    }
}

ContentStitchingBolt

初步代码如下:

import com.storm.demo.user_behavior.BehaviorConstants;
import com.storm.demo.user_behavior.entity.GroupKey;
import com.storm.demo.user_behavior.entity.MessageInfo;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

import java.util.HashMap;
import java.util.Map;

/**
* @author zyzdisciple
* @date 2019/4/7
*/
public class ContentStitchingBolt extends BaseRichBolt {

    private static final long serialVersionUID = -4684689172584740403L;

    private OutputCollector collector;

    /**
    * 存储每个时间段的数据, 由于每个时间段可能有多组session数据, 因此放入list中处理.
    */
    private Map<GroupKey, MessageInfo> collectMap;

    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        this.collector = collector;
        collectMap = new HashMap<>();
    }

    @Override
    public void execute(Tuple input) {
        //在这里, 首先获取数据, 如果本身为true, 且在分组中不存在对应的数据, 则可以直接发送.
        MessageInfo info = (MessageInfo) input.getValueByField(BehaviorConstants.FIELD_INFO);
        long timeGroup = input.getLongByField(BehaviorConstants.FIELD_TIME_GROUP);
        GroupKey key = new GroupKey(info.getSessionId(), timeGroup);
        //根据sessionId, group双重判断, 存储
        MessageInfo messageInfo = collectMap.get(key);
        //更新info的数据
        updateNewInfoWithPreMessage(info, messageInfo);
        //如果不存在且已结束, 直接发送数据即可
        if (info.getEnd()) {
            collector.emit(new Values(info));
            //发送后需要移除相关数据
            collectMap.remove(key);
        } else {
            collectMap.put(key, info);
        }
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields(BehaviorConstants.FIELD_INFO));
    }

    /**
    * 用以前的messageContent 更改当前message
    * @param newInfo
    * @param preInfo
    */
    private void updateNewInfoWithPreMessage(MessageInfo newInfo, MessageInfo preInfo) {
        if (preInfo != null) {
            StringBuilder sb = new StringBuilder(preInfo.getContent());
            sb.append(",").append(newInfo.getContent());
            newInfo.setContent(sb.toString());
        }
    }
}

其中GroupKey如下:

import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.commons.lang3.builder.HashCodeBuilder;

/**
* 以sessionId 和 group做为分组的主键, 重写equals方法
*
* @author zyzdisciple
* @date 2019/4/7
*/
public class GroupKey {

    private String sessionId;

    private Long timeGroup;

    public GroupKey(String sessionId, Long timeGroup) {
        this.sessionId = sessionId;
        this.timeGroup = timeGroup;
    }

    public String getSessionId() {
        return sessionId;
    }

    public void setSessionId(String sessionId) {
        this.sessionId = sessionId;
    }

    public Long getTimeGroup() {
        return timeGroup;
    }

    public void setTimeGroup(Long timeGroup) {
        this.timeGroup = timeGroup;
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) return true;

        if (o == null || getClass() != o.getClass()) return false;

        GroupKey groupKey = (GroupKey) o;

        return new EqualsBuilder()
                .append(sessionId, groupKey.sessionId)
                .append(timeGroup, groupKey.timeGroup)
                .isEquals();
    }

    @Override
    public int hashCode() {
        return new HashCodeBuilder(17, 37)
                .append(sessionId)
                .append(timeGroup)
                .toHashCode();
    }
}

我们发现对于GroupKey并没有序列化操作, 因为这里是不需要的, 在emit中发送的数据需要被序列化(并不准确, 但目前可以这样理解).

然而,仍然存在一个问题, 我们是已经将数据存储进去, 并且发送, 但是, end 标识为false的数据呢? 我们在这里并没有进行处理, 仅仅是存入Map中, 这当然是不合适的, 有一种通用的处理办法, 我们并不采用 简单的Map进行存储, 而是 采用另一种, TimeCacheMap, 有兴趣的可以自己了解一下, 设置定时时间, 过期后可以自行处理, 并不仅限于Storm中使用, 在任何代码中都可以使用.

如果尝试之后会发现它是过期的, 需要用另一个 RotatingMap进行实现, 需要自行在循环中调用方法. 循环需要sleep, 周期为 timeOut / (桶的个数 - 1); 原理与 TimeCacheMap一致.

但在这里并不打算介绍这两种方法, 而是Storm自身的定时机制, Tick, 其原理就在于, Storm会每隔固定时间发送一条系统消息给对应的Bolt, Spout ,甚至所有的都可以. 当我们每次接收到系统消息时, 则表示当前已经经过一个周期, 依然没有收到的数据, 需要进行过期处理.

它的处理机制可以自行定义, 比较灵活, 实现代码如下:

@Override
public Map<String, Object> getComponentConfiguration() {
    Config conf = new Config();
    conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, BehaviorConstants.SESSION_TIME_OUT_SECS);
    return conf;
}

仅仅需要在对应的Bolt, Spout中重写以上方法即可. 如果需要全局发送, 则需要在Topology的main方法中, 加入:

Config conf = new Config();
conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, BehaviorConstants.SESSION_TIME_OUT_SECS);

配置即可.

通过这种方式, 我们可以做到每隔60秒让系统发送一条消息给bolt, 还需加入如下代码, 更改execute方法即可:

@Override
public void execute(Tuple input) {
    if (input.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID)
        && input.getSourceStreamId().equals(Constants.SYSTEM_TICK_STREAM_ID)) {
        //在这里, 首先获取数据, 如果本身为true, 且在分组中不存在对应的数据, 则可以直接发送.
        MessageInfo info = (MessageInfo) input.getValueByField(BehaviorConstants.FIELD_INFO);
        long timeGroup = input.getLongByField(BehaviorConstants.FIELD_TIME_GROUP);
        GroupKey key = new GroupKey(info.getSessionId(), timeGroup);
        //根据sessionId, group双重判断, 存储
        MessageInfo messageInfo = collectMap.get(key);
        //更新info的数据
        updateNewInfoWithPreMessage(info, messageInfo);
        //如果不存在且已结束, 直接发送数据即可
        if (info.getEnd()) {
            collector.emit(new Values(info));
            //发送后需要移除相关数据
            collectMap.remove(key);
        } else {
            collectMap.put(key, info);
        }
    } else {
        final Long timeGroup = System.currentTimeMillis() / (BehaviorConstants.SESSION_TIME_OUT_SECS * 1000);
        Iterator<Map.Entry<GroupKey, MessageInfo>> iterator = collectMap.entrySet().iterator();
        while(iterator.hasNext()) {
            Map.Entry<GroupKey, MessageInfo> entry = iterator.next();
            if (entry.getKey().getTimeGroup() < timeGroup) {
                collector.emit(new Values(entry.getValue()));
                iterator.remove();
            }
        }
    }
}

通过这种方式就实现了周期性校验数据, 发送.

但依然要注意到的一点是, 这种发送并不准确, 并非是严格的按照时间发送, 而是将发送至bolt的系统消息 与 其他消息 加入队列, 排队发送, 等待前一次的执行完成.

线程安全

而当我们采取Map的时候, 常常需要考虑到的一个问题则是线程安全, 但在Storm中, 我们一般不需要考虑这个问题, 因为每一个bolt实例都是在线程中按序执行,因此不怎么需要考虑线程安全.

但是前提是, 不要再bolt中存在 static 变量用以保存数据, 又或是在bolt中开启新的线程, 导致线程安全问题. 当出现这两种情况时, 就需要考虑线程安全了.

当然Storm中已经提供了这种Tick机制的bolt:

package org.apache.storm.topology.base;

import org.apache.storm.tuple.Tuple;
import org.apache.storm.utils.TupleUtils;

/**
* This class is based on BaseRichBolt, but is aware of tick tuple.
*/
public abstract class BaseTickTupleAwareRichBolt extends BaseRichBolt {

    @Override
    public void execute(final Tuple tuple) {
        if (TupleUtils.isTick(tuple)) {
            onTickTuple(tuple);
        } else {
            process(tuple);
        }
    }

    /**
    * 当接收到系统消息时, 执行的方法
    */
    protected void onTickTuple(final Tuple tuple) {
    }

    /**
    * 其他消息执行的方法
    */
    protected abstract void process(final Tuple tuple);
}

当我们继承BaseTickTupleAwareRichBolt以做定时任务时, 需要重写 onTickTuple, process, getComponentConfiguration 即可.


而这种方式, 即tick 与 timeInterval结合的方式, 即是处理过期的一种比较好的方式.

特别是timeInterval, 通过时间分组, 是一种比较巧妙的方式.

当然,依然存在相应的问题, 所以最好的方式可能是时间窗口, 在这里暂时就以这种方式来实现.

所以是在消息信息中自身已经携带了时间信息是最好的方式.

FileWriterBolt

这个依然没有太多需要注意的地方, 在cleanUp中需要关闭流, 即可.

import com.google.gson.Gson;
import com.storm.demo.user_behavior.BehaviorConstants;
import com.storm.demo.user_behavior.entity.MessageInfo;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Tuple;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.Map;

/**
* @author zyzdisciple
* @date 2019/4/7
*/
public class MessageWriterBolt extends BaseRichBolt {

    private static final long serialVersionUID = 5411259920685235771L;

    private static final Logger logger = LoggerFactory.getLogger(MessageWriterBolt.class);

    private PrintWriter pw;

    private static Gson gson;

    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        try {
            pw = new PrintWriter(new FileWriter("E:\IdeaProjects\storm_demo\src\main\resources\user_behavior_data_write.txt"));
            gson = new Gson();
        } catch (IOException e) {
            logger.error("文件有误,保存失败");
        }

    }

    @Override
    public void execute(Tuple input) {
        MessageInfo info = (MessageInfo) input.getValueByField(BehaviorConstants.FIELD_INFO);
        try {
            String jsonMessage = gson.toJson(info, MessageInfo.class);
            pw.println(info);
            pw.flush();
        } catch (Exception e) {
            logger.warn("格式转换失败:" + info);
        }
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {

    }

    @Override
    public void cleanup() {
        if (pw != null) {
            pw.close();
        }
    }
}

Topology

排在首位的是分流策略的选取, 在前一篇文章中提到了三种, shuffle, global, field 三种方式. 那么来整合一下吧:

import com.storm.demo.user_behavior.bolt.ContentStitchingBolt;
import com.storm.demo.user_behavior.bolt.MessageWriterBolt;
import com.storm.demo.user_behavior.bolt.TimeIntervalBolt;
import com.storm.demo.user_behavior.spout.FileReaderSpout;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;
import org.apache.storm.utils.Utils;

/**
* @author zyzdisciple
* @date 2019/4/7
*/
public class UserBehaviorTopology {

    private static final String STREAM_SPOUT = "spout";
    private static final String STREAM_TIME_INTERVAL_BOLT = "time-bolt";
    private static final String STREAM_CONTENT_BOLT = "content-stitching-bolt";
    private static final String STREAM_FILE_WRITER_BOLT = "file-writer-bolt";
    private static final String TOPOLOGY_NAME = "user-behavior-topology";
    private static final long TEN_SECONDS = 1000L * 10;

    public static void main(String[] args) {
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout(STREAM_SPOUT, new FileReaderSpout());
        builder.setBolt(STREAM_TIME_INTERVAL_BOLT, new TimeIntervalBolt()).shuffleGrouping(STREAM_SPOUT);
        //这里必然要使用field 保证同一group的数据发送到同一个bolt中.
        builder.setBolt(STREAM_CONTENT_BOLT, new ContentStitchingBolt())
                .fieldsGrouping(STREAM_TIME_INTERVAL_BOLT, new Fields(BehaviorConstants.FIELD_TIME_GROUP));
        builder.setBolt(STREAM_FILE_WRITER_BOLT, new MessageWriterBolt()).shuffleGrouping(STREAM_CONTENT_BOLT);

        Config conf = new Config();
        //当设置为true时, 且log level定义为info时, 会在控制台输出发射元组内容
        conf.setDebug(true);
        StormTopology topology = builder.createTopology();
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology(TOPOLOGY_NAME, conf, topology);
        //停留几秒后关闭拓扑,否则会永久运行下去
        Utils.sleep(TEN_SECONDS);
        cluster.killTopology(TOPOLOGY_NAME);
        cluster.shutdown();
    }
}

拓扑的扩展

那么仅仅这样就可以了吗? 当然不行, 我们会发现, 在整个拓扑中, 依然是采用串行化的方式去处理数据, 对资源得不到充分的利用, 也是不能够支撑起大数据的.

所以在这里就需要提到另一个东西, 即拓扑的并行化

我们可以先来看看代码, 在UserBehaviorTopology 中的main 方法做如下更改:

builder.setSpout(STREAM_SPOUT, new FileReaderSpout(), 4);

有什么区别呢? 在最后一位参数加了个4.

我们需要知道的一点是: 无论是 Spout 还是 bolt 最终都会回归到虚拟机, 或物理上, 在一个JVM进程中去运行相关的代码, 虽然我们自身无法分配, 决定哪一个bolt在哪一个jvm中去执行, 但是我们可以决定同时用多少个线程来运行相关的bolt, spout, 进行并行化处理, 大大提升速度, 而在这里, 我们则是指定了4个线程去执行spout. 也就是 excutor

需要注意:无论有多少个jvm在运行spout, 其总的执行spout的线程数,确定为4

那么在第一环进行了扩展, 可以支持更大批量的数据处理, 但这并不够, 系统的整体性能必然是取决于最低点的.

builder.setSpout(STREAM_SPOUT, new FileReaderSpout(), 4);
builder.setBolt(STREAM_TIME_INTERVAL_BOLT, new TimeIntervalBolt(), 2).shuffleGrouping(STREAM_SPOUT);
builder.setBolt(STREAM_CONTENT_BOLT, new ContentStitchingBolt(), 8)
        .fieldsGrouping(STREAM_TIME_INTERVAL_BOLT, new Fields(BehaviorConstants.FIELD_TIME_GROUP));
builder.setBolt(STREAM_FILE_WRITER_BOLT, new MessageWriterBolt(), 4).shuffleGrouping(STREAM_CONTENT_BOLT);

在这里考虑到Spout 与 ContentStitchingBolt的任务功能比较复杂, 因此给了较多的executor去执行.

这里的线程数分配 并没有严格的要求, 节点功能简单, 可以适当减少执行器, 节点功能复杂可以适当增加.

而在前面提到过, 异常处理问题, 在每一个bolt 或Spout中的异常都需要被处理掉, 我们知道, 节点是运行在线程中的, 并且并不一定在某个jvm中运行多少个bolt, spout, 当抛出运行时异常或未被cathc掉的异常 会导致程序异常终止, 在这种情况下, 可能会牵连其他的节点也异常终止, 导致topology挂掉, 因此必须处理.

基本概念

而在上面提到的jvm也被称作 workNode, worker.

执行线程则是 executor

另外每一个线程中可能会运行一个或多个节点(spout bolt实例), 这样的实例就是task, 即任务.

这是并行性相关的三个概念.

一个worker对应一个jvm进程, 其中包含多个线程, 每个线程中含有数目不定(默认是一个)的task, 也即spout bolt实例.

而设定bolt则是:

builder.setBolt(STREAM_CONTENT_BOLT, new ContentStitchingBolt(), 8)
        .setNumTasks(64)
        .fieldsGrouping(STREAM_TIME_INTERVAL_BOLT, new Fields   (BehaviorConstants.FIELD_TIME_GROUP));

通过这种方式, setNumTasks 定义了64个实例. 每个线程中存在8个实例, 这8个实例采取循环执行, 而非并行执行的方式.

TaskNumber

那么为什么要用这种方式呢? 每个线程中多创建8个实例串行执行,效率会有所提高吗? 并不会.

那么意义在哪里呢?

Storm中有一个比较有趣的功能叫做, Rebalance, 支持在拓扑运行时动态调整 work 和 executor的数量. 但却并不支持调整 task的数量.

并且要求 executor <= task 数量, 如果超出, 则executor 会与 task保持一致. 那么task数量为什么不能调整呢? 浅显的例子是, 对于 fieldGrouping而言, 要求 field一致的数据, 最终都流向同一个 bolt实例, 也就是同一个task中, 如果在rebalance过程中, 创建了新的bolt, 就会出现问题.

进一步调整

可以通过这样两个方面来做进一步优化:

  1. 分解为功能组件的优化方式

    将每一个功能细分, 分解到不同的节点中进行处理, 这样满足了单一职责原则, 在将来调整扩展时带来极大的方便.

  2. 基于重分配的方式进行设计

    我们需要在拓扑中尽可能的减少重分配的次数, 因为每一次执行分配, 我们需要进行相应的序列化, 反序列化, 以及网络传输种种开销.

    分配的节点越多, 我们需要越多的实例, 对物理虚拟资源的开销也都会相应的变大.

因此, 我们需要在尽可能满足组件优化的同时, 减少重分配次数.

想到这里不仅产生了一点点问题:

那不是在一个单独的Spout中全部计算, 将功能定义为各种方法, 就是最大程度的满足了最少重分配吗?

这里就会存在一个问题, 计算资源的分配, 并行处理多个计算显然要比流式处理来的更快, 我们将不同的计算任务分配到各个节点, 最大化利用物理资源.

我想这大概是Storm的设计初衷.

所以其核心就在于将计算资源分配, 同时如果能够最少程度的减少网络传输, 最好不过了. 而这一点, 也正是Spark的实现方式.

那么在我们的项目中又该如何调整呢?

答案在于调整 TimeIntervalBolt, 一方面它本身也是数据处理的一部分, 另一方面, 与其为了这一操作加入不必要的网络传输序列化等, 不如在前一步直接进行处理, 减少一个节点.

实现如下:

if (isValid(info, line)) {
    completingMessage(info);
    //判断当前的时间区间.
    long timeGroup = System.currentTimeMillis() / (BehaviorConstants.SESSION_TIME_OUT_SECS * 1000);
    collector.emit(new Values(timeGroup, info));
}

删除TimeInterval, 加入Spout中, 并做相应字段,分流调整.

说到底, 这本身还是一件全凭借个人理解的问题, 合并时需要考虑这样几个点:

  1. 性能影响, 即合并与拆分 带来的开销, 利弊权衡.

  2. 语义化问题,否则会为将来扩展带来不必要的问题.

总结

在本篇中通过一个更加真实的案例, 从头设计了一个拓扑, 有以下知识点:

  • 如何设计一个拓扑, 我们需要考虑其数据源的特性, 在一开始就确定最终的输出格式, 以及进一步一点点确定中间的操作步骤.

  • Tick 机制, 在Storm中实现相关的定时处理.

  • 并行度, worker, executor, task, 对这些有了基本的了解.

  • bolt的线程安全问题, 有了基本了解

  • 拓扑的设计方法, 基于功能, 和基于重分配的两种方式.

以上是关于Storm系列二: Storm拓扑设计的主要内容,如果未能解决你的问题,请参考以下文章

Storm 系列—— Storm 核心概念详解

Storm,Spark和Samza

storm应用实践:实时事务处理《读书笔记》

storm应用实践:实时事务处理《读书笔记》

Storm新特性之Flux

Storm篇--Storm并发机制