Storm-Flux简介

Posted angelxp

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Storm-Flux简介相关的知识,希望对你有一定的参考价值。

Fluk

Fluk是storm中的一个框架,主要功能是简化storm 任务在管理、配置topology中的一些问题和麻烦。

产生的原因背景

在管理storm 的topolgy的过程中,我们最常做的工作就是提交jar包。如下代码所示:

public static void main(String[] args) throws Exception {
    // 返回的逻辑值用来判断我们是否在本地上运行
    // 创建必要的配置选项...
    boolean runLocal = shouldRunLocal();
    if(runLocal){
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology(name, conf, topology);
    } else {
        StormSubmitter.submitTopology(name, conf, topology);
    }
}

上述提交任务的代码,通常都是位于上层的java任务管理器中。同时关于topoplgy的定义也位于其中,每当任务有变动,都需要重新编译jar包,重新提交才可以。为了减少这部分工作,Fluk就将这部分工作接手了过来,改为配置的方式进行。基于此得出Fluk的主要功能点如下:

  • 安装和部署storm 拓扑采用配置的方式而不是内置的方式进行。
  • 通过使用yaml dsl 定义storm core api(spouts/bolt)
  • yaml dsl 支持对storm-kafka、storm-hdfs、storm-hbase等。

Flux 使用

在pom 文件中加入对flux的依赖,如下所示:

<!-- 在shaded jar文件中包含FLux和用户依赖包 -->
<dependencies>
    <!-- Flux include -->
    <dependency>
        <groupId>org.apache.storm</groupId>
        <artifactId>flux-core</artifactId>
        <version>${storm.version}</version>
    </dependency>
    <!-- Flux Wrappers include -->
    <dependency>
        <groupId>org.apache.storm</groupId>
        <artifactId>flux-wrappers</artifactId>
        <version>${storm.version}</version>
    </dependency>

    <!-- 在这里添加用户依赖包... -->

</dependencies>
<!-- 创建一个包括所有依赖包的大大的jar文件 -->
<build>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-shade-plugin</artifactId>
            <version>1.4</version>
            <configuration>
                <createDependencyReducedPom>true</createDependencyReducedPom>
            </configuration>
            <executions>
                <execution>
                    <phase>package</phase>
                    <goals>
                        <goal>shade</goal>
                    </goals>
                    <configuration>
                        <transformers>
                            <transformer
                                    implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
                            <transformer
                                    implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                <mainClass>org.apache.storm.flux.Flux</mainClass>
                            </transformer>
                        </transformers>
                    </configuration>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

部署和运行Flux

和以前使用storm jar 提交任务的方式基本相同。只是由于当前的任务中可能已经没有了对topology的定义,所以需要定义你使用的flux的yaml文件是哪一个,以便找到对应的topolgy逻辑,如下所示:

storm jar myTopology-0.1.0-SNAPSHOT.jar org.apache.storm.flux.Flux --local my_config.yaml

根据以上可以看到Flux的和兴就在对yaml文件的编写配置。下面具体说明一下

yaml配置

Flux 拓扑定义

  1. 拓扑的名字及拓扑使用资源的配置如topolgy.workers
  2. 拓扑组件列表
  3. 一个DSL 拓扑定义
  • 一个spouts 列表,每一个项通过一个唯一的ID区别
  • 一个bolts列表,每一个项通过一个唯一的ID区别
  • 一个可以创建 org.apache.storm.generated.StormTopology 实例的JVM类
name: "yaml-topology"
config:
  topology.workers: 1

# spout定义
spouts:
  - id: "spout-1"
    className: "org.apache.storm.testing.TestWordSpout"
    parallelism: 1

# bolt定义
bolts:
  - id: "bolt-1"
    className: "org.apache.storm.testing.TestWordCounter"
    parallelism: 1
  - id: "bolt-2"
    className: "org.apache.storm.flux.wrappers.bolts.LogInfoBolt"
    parallelism: 1

# stream定义,定义流的流向
streams:
  - name: "spout-1 --> bolt-1" #name暂时未用上(可以在logging,UI等中作为placeholder)
    from: "spout-1"
    to: "bolt-1"
    grouping:
      type: FIELDS
      args: ["word"]

  - name: "bolt-1 --> bolt2"
    from: "bolt-1"
    to: "bolt-2"
    grouping:
      type: SHUFFLE

上面是一个大的配置框架,下面说一些具体的特征配置

组件

组件从本质来说是对象实例,用来在对spouts和bolts的配置选项中获取。如果你对Spring框架很熟悉,这里的组件大概就类比于Spring中的beans。每一个组件都是可被识别的,至少是可以通过一个唯一的标识符(字符串)和一个类名(字符串)。举个例子,以下的例子将会创建一个 org.apache.storm.kafka.StringScheme 类的实例作为关键字 "stringScheme" 的引用。这里我们假设这个类 org.apache.storm.kafka.StringScheme 有一个默认的构造函数。

components:
  - id: "stringScheme"
    className: "org.apache.storm.kafka.StringScheme"
    (假设有默认的构造函数)

上面提到的定义的组件使用的是class类采用的是默认构造函数,如果需要自己构造则采用如下的方法

自定义构造函数

通过在yaml文件中增加constructorArgs的方法来定义构造函数。

- id: "zkHosts"
    className: "org.apache.storm.kafka.ZkHosts"
    constructorArgs:
      - "localhost:2181"
    (通过调用一个把单个字符串“localhost:2181”作为参数传递给构造函数来创建一个对象)

contructorArgs 是一个列表,其元素是对象。这个列表会被传递给类的构造函数们。上面的例子就是将zk的配置用参数的方式传入到class的构造函数中。

引用

每一个组件实例都通过一个唯一的id可悲其他组件重复使用。为了引用一个已存在的组件,你需要在使用 ref 这个标签的时候指明这个组件的id。

在以下的例子中,一个名为的组件被创建,之后将被作为另一个组件的构造函数的参数被引用

components:
  - id: "stringScheme"
    className: "org.apache.storm.kafka.StringScheme"

  - id: "stringMultiScheme"
    className: "org.apache.storm.spout.SchemeAsMultiScheme"
    constructorArgs:
      - ref: "stringScheme" # component with id "stringScheme" must be declared above.
属性

除去允许在调用构造函数的时候传进不同的参数,Flux同样允许在配置组件的时候使用被声明为 public 的类似JavaBean的setter方法和域

 - id: "spoutConfig"
    className: "org.apache.storm.kafka.SpoutConfig"
    constructorArgs:
      # brokerHosts
      - ref: "zkHosts"
      # topic
      - "myKafkaTopic"
      # zkRoot
      - "/kafkaSpout"
      # id
      - "myId"
    properties:
      - name: "ignoreZkOffsets"
        value: true
      - name: "scheme"
        ref: "stringMultiScheme"

参考链接:http://storm.apachecn.org/releases/cn/1.1.0/flux.html

以上是关于Storm-Flux简介的主要内容,如果未能解决你的问题,请参考以下文章

Android 逆向Linux 文件权限 ( Linux 权限简介 | 系统权限 | 用户权限 | 匿名用户权限 | 读 | 写 | 执行 | 更改组 | 更改用户 | 粘滞 )(代码片段

SpringCloud系列十一:SpringCloudStream(SpringCloudStream 简介创建消息生产者创建消息消费者自定义消息通道分组与持久化设置 RoutingKey)(代码片段

C#-WebForm-★内置对象简介★Request-获取请求对象Response相应请求对象Session全局变量(私有)Cookie全局变量(私有)Application全局公共变量Vi(代码片段

react简介

react简介

在PaddlePaddle中的Notebook代码片段