从0 到1Flink的成长之路

Posted 熊老二-

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了从0 到1Flink的成长之路相关的知识,希望对你有一定的参考价值。

 

 

 

一、基于集合的Source

API

一般用于学习测试时模拟数据使用
1.env.fromElements(可变参数);
2.env.fromColletion(各种集合);
3.env.generateSequence(开始,结束);

代码演示:

package xx.xxxxx.flink.source;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import java.util.Arrays;
/**
* DataSet API 批处理中数据源:基于集合Source
* 1.env.fromElements(可变参数);
* 2.env.fromColletion(各种集合);
* 3.env.generateSequence(开始,结束);
*/
public class BatchSourceCollectionDemo {
public static void main(String[] args) throws Exception {
// 1. 执行环境-env
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// 2. 数据源-source
// 方式一:可变参数列表
DataSource<String> dataSet01 = env.fromElements("hadoop", "flink", "spark");
dataSet01.printToErr();
// 方式二:集合,如列表List
DataSource<String> dataSet02 = env.fromCollection(Arrays.asList("hadoop", "spark", "flink"));
dataSet02.printToErr();
// 方式三:序列
DataSource<Long> dataSet03 = env.generateSequence(1, 10);
dataSet03.printToErr();
}
}

二、基于文件的Source

API

1.env.readTextFile(本地文件/HDFS文件); //压缩文件也可以
2.env.readCsvFile[泛型]("本地文件/HDFS文件")
Configuration parameters = new Configuration();
parameters.setBoolean("recursive.file.enumeration", true);//设置是否递归读取目录
3.env.readTextFile("目录").withParameters(parameters);

代码演示:

package xx.xxxxx.flink.source;
import lombok.Data;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.configuration.Configuration;
/**
* DataSet API 批处理中数据源:基于文件Source
* 1.env.readTextFile(本地文件/HDFS文件); //压缩文件也可以
* 2.env.readCsvFile[泛型]("本地文件/HDFS文件")
* Configuration parameters = new Configuration();
* parameters.setBoolean("recursive.file.enumeration", true);//设置是否递归读取目录
* 3.env.readTextFile("目录").withParameters(parameters);
*/
public class BatchSourceFileDemo {
public static void main(String[] args) throws Exception {
// 1. 执行环境-env
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// 2. 数据源-source
// 文本文件
DataSource<String> dataSet01 = env.readTextFile("datas/wordcount.data");
dataSet01.printToErr();
// CSV 文件
DataSource<Rating> dataSet02 = env
.readCsvFile("datas/u.data")
.fieldDelimiter("\\t")
.pojoType(Rating.class, "userId", "userId", "rating", "timestamp");
dataSet02.printToErr();
// 目录
Configuration parameters = new Configuration();
parameters.setBoolean("recursive.file.enumeration", true);
DataSource<String> dataSet03 = env
.readTextFile("datas/subDatas")
.withParameters(parameters);
dataSet03.printToErr();
}
@Data
public static class Rating{
public Integer userId ;
public Integer movieId ;
public Double rating ;
public Long timestamp ;
}
}

以上是关于从0 到1Flink的成长之路的主要内容,如果未能解决你的问题,请参考以下文章

从0到1Flink的成长之路(十六)

从0到1Flink的成长之路(二十)-案例:时间会话窗口

从0到1Flink的成长之路

从0到1Flink的成长之路

从0到1Flink的成长之路

从0到1Flink的成长之路(十三)