学习笔记Flink—— Flink开发环境配置及运行实例(单词计数)
Posted 别呀
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了学习笔记Flink—— Flink开发环境配置及运行实例(单词计数)相关的知识,希望对你有一定的参考价值。
一、Intellij IDEA 环境配置
1、创建Maven工程
1.1、开发环境
Maven && JDK
1.2、Pom配置
Compiler Configuration
在pom.xml添加:
<properties>
<maven.compiler.target>1.8</maven.compiler.target>
<maven.compiler.source>1.8</maven.compiler.source>
</properties>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
Flink Dependency
在pom.xml添加:
<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>1.10.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>1.10.1</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.7</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
<scope>runtime</scope>
</dependency>
</dependencies>
然后IDEA就会自动帮我们添加依赖
1.3、Log配置
log4j.properties
在resources下建立log4j.properties
文件并添加(linux上flink安装目录conf下的log4j-console.properties内容):
# This affects logging for both user code and Flink
log4j.rootLogger=INFO, console
# Uncomment this if you want to _only_ change Flink's logging
log4j.logger.org.apache.flink=INFO
# The following lines keep the log level of common libraries/connectors on
# log level INFO. The root logger does not override this. You have to manually
# change the log levels here.
log4j.logger.akka=INFO
log4j.logger.org.apache.kafka=INFO
log4j.logger.org.apache.hadoop=INFO
log4j.logger.org.apache.zookeeper=INFO
# Log all infos to the console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
# Suppress the irrelevant (wrong) warnings from the Netty channel handler
log4j.logger.org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, console
1.4、添加Scala支持
在main文件下新建一个scala目录,并设置为source root
添加scala支持
二、案例:Flink单词计算本地实战
代码:
package demo
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
object WindowWordCount {
def main(args: Array[String]): Unit = {
val host = "node110"
val port = 9999
val windowSeconds = 5
//get env
val env = StreamExecutionEnvironment.getExecutionEnvironment
//create data source
val source = env.socketTextStream(host, port)
val counts = source
.flatMap { line => line.toLowerCase.split("\\\\W+").filter(word => word.nonEmpty) }
.map { word => (word, 1) }
.keyBy(0)
.timeWindow(Time.seconds(windowSeconds))
.sum(1)
//add sink
counts.print()
//execute
env.execute("Window Stream Word Count with paremters")
}
}
测试:
linux执行命令:nc -lk 9999
三、Flink集群运行实战
3.1、代码修改
if(args.length != 3){
println("Usage: WindowWordCount <socketHost> <socketPort> <windowSeconds>")
System.exit(1)
}
val host = args(0)
val port = args(1).toInt
val windowSeconds = args(2).toInt
3.2、程序打包 & 上传
3.3、运行
先开启一个会话(注意先开启,否则执行会出现一堆东西)
再开一个会话
输入数据:
网页查看:
四、Dataset API实现(单词计数)
数据文件 input.txt:
I am a student
I love the world
代码:
package demo
import org.apache.flink.streaming.api.scala._
import org.apache.flink.api.scala.ExecutionEnvironment
object WordCount {
def main(args: Array[String]): Unit = {
//get env
val env = ExecutionEnvironment.getExecutionEnvironment
//create data source
val source = env.readTextFile("D:\\\\java test\\\\flink_test\\\\src\\\\main\\\\resources\\\\input.txt")
val counts = source
.flatMap { line => line.toLowerCase.split("\\\\W+").filter(word => word.nonEmpty) }
.map { word => (word, 1) }
.groupBy(0)
.sum(1)
//add sink
counts.writeAsText("D:\\\\java test\\\\flink_test\\\\target\\\\output00")
//execute
env.execute("Batch Word Count")
}
}
结果
以上是关于学习笔记Flink—— Flink开发环境配置及运行实例(单词计数)的主要内容,如果未能解决你的问题,请参考以下文章