Fluk
Fluk是storm中的一个框架,主要功能是简化storm 任务在管理、配置topology中的一些问题和麻烦。
产生的原因背景
在管理storm 的topolgy的过程中,我们最常做的工作就是提交jar包。如下代码所示:
public static void main(String[] args) throws Exception {
// 返回的逻辑值用来判断我们是否在本地上运行
// 创建必要的配置选项...
boolean runLocal = shouldRunLocal();
if(runLocal){
LocalCluster cluster = new LocalCluster();
cluster.submitTopology(name, conf, topology);
} else {
StormSubmitter.submitTopology(name, conf, topology);
}
}
上述提交任务的代码,通常都是位于上层的java任务管理器中。同时关于topoplgy的定义也位于其中,每当任务有变动,都需要重新编译jar包,重新提交才可以。为了减少这部分工作,Fluk就将这部分工作接手了过来,改为配置的方式进行。基于此得出Fluk的主要功能点如下:
- 安装和部署storm 拓扑采用配置的方式而不是内置的方式进行。
- 通过使用yaml dsl 定义storm core api(spouts/bolt)
- yaml dsl 支持对storm-kafka、storm-hdfs、storm-hbase等。
Flux 使用
在pom 文件中加入对flux的依赖,如下所示:
<!-- 在shaded jar文件中包含FLux和用户依赖包 -->
<dependencies>
<!-- Flux include -->
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>flux-core</artifactId>
<version>${storm.version}</version>
</dependency>
<!-- Flux Wrappers include -->
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>flux-wrappers</artifactId>
<version>${storm.version}</version>
</dependency>
<!-- 在这里添加用户依赖包... -->
</dependencies>
<!-- 创建一个包括所有依赖包的大大的jar文件 -->
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>1.4</version>
<configuration>
<createDependencyReducedPom>true</createDependencyReducedPom>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>org.apache.storm.flux.Flux</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
部署和运行Flux
和以前使用storm jar 提交任务的方式基本相同。只是由于当前的任务中可能已经没有了对topology的定义,所以需要定义你使用的flux的yaml文件是哪一个,以便找到对应的topolgy逻辑,如下所示:
storm jar myTopology-0.1.0-SNAPSHOT.jar org.apache.storm.flux.Flux --local my_config.yaml
根据以上可以看到Flux的和兴就在对yaml文件的编写配置。下面具体说明一下
yaml配置
Flux 拓扑定义
- 拓扑的名字及拓扑使用资源的配置如topolgy.workers
- 拓扑组件列表
- 一个DSL 拓扑定义
- 一个spouts 列表,每一个项通过一个唯一的ID区别
- 一个bolts列表,每一个项通过一个唯一的ID区别
- 一个可以创建 org.apache.storm.generated.StormTopology 实例的JVM类
name: "yaml-topology"
config:
topology.workers: 1
# spout定义
spouts:
- id: "spout-1"
className: "org.apache.storm.testing.TestWordSpout"
parallelism: 1
# bolt定义
bolts:
- id: "bolt-1"
className: "org.apache.storm.testing.TestWordCounter"
parallelism: 1
- id: "bolt-2"
className: "org.apache.storm.flux.wrappers.bolts.LogInfoBolt"
parallelism: 1
# stream定义,定义流的流向
streams:
- name: "spout-1 --> bolt-1" #name暂时未用上(可以在logging,UI等中作为placeholder)
from: "spout-1"
to: "bolt-1"
grouping:
type: FIELDS
args: ["word"]
- name: "bolt-1 --> bolt2"
from: "bolt-1"
to: "bolt-2"
grouping:
type: SHUFFLE
上面是一个大的配置框架,下面说一些具体的特征配置
组件
组件从本质来说是对象实例,用来在对spouts和bolts的配置选项中获取。如果你对Spring框架很熟悉,这里的组件大概就类比于Spring中的beans。每一个组件都是可被识别的,至少是可以通过一个唯一的标识符(字符串)和一个类名(字符串)。举个例子,以下的例子将会创建一个 org.apache.storm.kafka.StringScheme 类的实例作为关键字 "stringScheme" 的引用。这里我们假设这个类 org.apache.storm.kafka.StringScheme 有一个默认的构造函数。
components:
- id: "stringScheme"
className: "org.apache.storm.kafka.StringScheme"
(假设有默认的构造函数)
上面提到的定义的组件使用的是class类采用的是默认构造函数,如果需要自己构造则采用如下的方法
自定义构造函数
通过在yaml文件中增加constructorArgs的方法来定义构造函数。
- id: "zkHosts"
className: "org.apache.storm.kafka.ZkHosts"
constructorArgs:
- "localhost:2181"
(通过调用一个把单个字符串“localhost:2181”作为参数传递给构造函数来创建一个对象)
contructorArgs 是一个列表,其元素是对象。这个列表会被传递给类的构造函数们。上面的例子就是将zk的配置用参数的方式传入到class的构造函数中。
引用
每一个组件实例都通过一个唯一的id可悲其他组件重复使用。为了引用一个已存在的组件,你需要在使用 ref 这个标签的时候指明这个组件的id。
在以下的例子中,一个名为的组件被创建,之后将被作为另一个组件的构造函数的参数被引用
components:
- id: "stringScheme"
className: "org.apache.storm.kafka.StringScheme"
- id: "stringMultiScheme"
className: "org.apache.storm.spout.SchemeAsMultiScheme"
constructorArgs:
- ref: "stringScheme" # component with id "stringScheme" must be declared above.
属性
除去允许在调用构造函数的时候传进不同的参数,Flux同样允许在配置组件的时候使用被声明为 public 的类似JavaBean的setter方法和域
- id: "spoutConfig"
className: "org.apache.storm.kafka.SpoutConfig"
constructorArgs:
# brokerHosts
- ref: "zkHosts"
# topic
- "myKafkaTopic"
# zkRoot
- "/kafkaSpout"
# id
- "myId"
properties:
- name: "ignoreZkOffsets"
value: true
- name: "scheme"
ref: "stringMultiScheme"