Storm工具及命令

Posted Jason__Zhou

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Storm工具及命令相关的知识,希望对你有一定的参考价值。

Storm开发

问题:

消息处理 best effort方式.
可靠性 Guaranteeing Message Processing

并行度

一个进程属于特定的topology。
进程启动一个或多个线程。
一个task认为是一个spout或者bolt实例。
默认一个executor分配一个task。
可以设置worker数量,executor数量和task数量。

编程

IDEA 运行storm-starter’s

  1. Follow the steps in storm-starter’s: Using storm-starter with IntelliJ IDEA
  2. Open Maven’s pom.xml file and remove provided line from storm dependency. This enables IntelliJ to compile storm dependency on build.
  3. Go to /src/jvm/storm/starter/, right click on ExclamationTopology file and Run ‘ExclamationTop….main()’

运行旧版程序

In Storm 1.0 , Java package naming moved from backtype.storm to org.apache.storm.
If you intend to run any topologies that used to run previous versions of Storm in Storm 1.0, you can do so by using one of two options:

You can rebuild the topology by renaming the imports of the backtype.storm package to org.apache in all of your topology code.

or

You can add config

Client.jartransformer.class = org.apache.storm.hack.StormShadeTransformer

to storm.yaml.

概念

本地模式

import org.apache.storm.LocalCluster;

LocalCluster cluster = new LocalCluster();
//关闭
cluster.shutdown();

创建topology

TopologyBuilder builder = new TopologyBuilder();

builder.setSpout("1", new TestWordSpout(true), 5);
builder.setSpout("2", new TestWordSpout(true), 3);
builder.setBolt("3", new TestWordCounter(), 3)
         .fieldsGrouping("1", new Fields("word"))
         .fieldsGrouping("2", new Fields("word"));
builder.setBolt("4", new TestGlobalCount())
         .globalGrouping("1");

Map conf = new HashMap();
conf.put(Config.TOPOLOGY_WORKERS, 4);

StormSubmitter.submitTopology("mytopology", conf, builder.createTopology());

本地debug模式,10s后停止.

TopologyBuilder builder = new TopologyBuilder();

builder.setSpout("1", new TestWordSpout(true), 5);
builder.setSpout("2", new TestWordSpout(true), 3);
builder.setBolt("3", new TestWordCounter(), 3)
         .fieldsGrouping("1", new Fields("word"))
         .fieldsGrouping("2", new Fields("word"));
builder.setBolt("4", new TestGlobalCount())
         .globalGrouping("1");

Map conf = new HashMap();
conf.put(Config.TOPOLOGY_WORKERS, 4);
conf.put(Config.TOPOLOGY_DEBUG, true);

LocalCluster cluster = new LocalCluster();
cluster.submitTopology("mytopology", conf, builder.createTopology());
Utils.sleep(10000);
cluster.shutdown();

spout the main method on spouts is nextTuple. nextTuple

可靠性
消息处理过程:
Storm offers several different levels of guaranteed message processing, includeing

  1. best effort
  2. at least once
  3. exactly once through Trident.

调度器
默认调度器和资源隔离的调度器.

  1. Pluggable scheduler
  2. Isolation Scheduler

Nimbus HA Design document,Highly Available Nimbus Design
选举和状态保存

依赖

<dependency>
  <groupId>org.apache.storm</groupId>
  <artifactId>storm-core</artifactId>
  <version>1.0.2</version>
  <scope>provided</scope>
</dependency>

CGroup Enforcement
Integration with Resource Aware Scheduler
CGroups can be used in conjunction with the Resource Aware Scheduler. CGroups will then enforce the resource usage of workers as allocated by the Resource Aware Scheduler. To use cgroups with the Resource Aware Scheduler, simply enable cgroups and be sure NOT to set storm.worker.cgroup.memory.mb.limit and storm.worker.cgroup.cpu.limit configs.

Pacemaker
当storm变大时,zoonkeeper成为瓶颈.维护一致性时导致大量的磁盘和网络开销.
Pacemaker是简单的键值存储.维护心跳信息,心跳信息不需要存储到硬盘中,在内存中存储.
Pacemaker作为单个守护程序实例运行,使其成为潜在的单点故障。

相比zoonkeeper使用的资源更少. Gigabit networking下,the real limit is likely around 2000-3000 nodes.

以上是关于Storm工具及命令的主要内容,如果未能解决你的问题,请参考以下文章

Storm常用操作命令及WordCount

黑客工具: Storm-Breaker(访问摄像头,麦克风,获取定位)附kali linux下载命令

1 storm基本概念 + storm编程规范及demo编写

Storm安装及使用

storm 远程过程调用及client调用

Storm 快速安装及部署示例