FlinkCDC从Mongodb同步数据至elasticsearch(ES) 新版
Posted 极之夜
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了FlinkCDC从Mongodb同步数据至elasticsearch(ES) 新版相关的知识,希望对你有一定的参考价值。
FlinkCDC从Mongodb同步数据至elasticsearch(ES)
DataStreamingAPI方式
网上挺多flinksql方式同步数据,但是遇到数据比较杂乱,会经常无缘无故报错,笔者被逼无奈,采用API方式处理数据后同步,不知为何API资料笔者找到的资料很少,还很不全,摸着石头过河总算完成任务,收获颇丰,以此分享给大家。
pom.xml
<modelVersion>4.0.0</modelVersion>
<groupId>com.cece</groupId>
<artifactId>Mongo-ES</artifactId>
<version>1.0</version>
<properties>
<java.version>1.8</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<maven.compiler.source>$java.version</maven.compiler.source>
<maven.compiler.target>$java.version</maven.compiler.target>
<flink.version>1.13.0</flink.version>
<scala.binary.version>2.12</scala.binary.version>
<scala.version>2.12</scala.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>$flink.version</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>$flink.version</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>$flink.version</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.12</artifactId>
<version>$flink.version</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_$scala.binary.version</artifactId>
<version>$flink.version</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>$flink.version</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.68</version>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.8.3</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch7_2.11</artifactId>
<version>$flink.version</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_$scala.version</artifactId>
<version>$flink.version</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.25</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.25</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-to-slf4j</artifactId>
<version>2.14.0</version>
</dependency>
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongo-java-driver</artifactId>
<version>3.11.0</version>
</dependency>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mongodb-cdc</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-sql-connector-mongodb-cdc</artifactId>
<version>2.2.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/connect-api -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-api</artifactId>
<version>2.7.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.7.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-jdbc -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_2.12</artifactId>
<version>$flink.version</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>log4j:*</exclude>
<exclude>org.apache.hadoop:*</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<!-- Do not copy the signatures in the META-INF folder.Otherwise, this might cause SecurityExceptions when using the JAR. -->
<!-- 打包时不复制META-INF下的签名文件,避免报非法签名文件的SecurityExceptions异常-->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers combine.children="append">
<!-- The service transformer is needed to merge META-INF/services files -->
<!-- connector和format依赖的工厂类打包时会相互覆盖,需要使用ServicesResourceTransformer解决-->
<transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
主程序
public class Config
public static final String MONGODB_URL = "1xxx";
public static final String MONGODB_USER = "sxx";
public static final String MONGODB_PWD = "xx";
public static final String MONGODB_DATABASE = "xx";
public static final String MONGODB_COLLECTION = "xx";
public static final String ES_URL = "xx";
public static final int ES_PORT = xx;
public static final String ES_USER = "xxx";
public static final String ES_PWD = "xxxx";
public static final String ES_INDEX = "xxxx";
public static final int BUFFER_TIMEOUT_MS =100;
public static final int CHECKPOINT_INTERVAL_MS =3*60*1000;
public static final int CHECKPOINT_TIMEOUT_MS = 1*60*1000;
public static final int CHECKPOINT_MIN_PAUSE_MS = 1*60*1000;
主程序
public class FlinkCdcSyn_API
public static void main(String[] args) throws Exception
//1.构建flink环境及配置checkpoint
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// env.setParallelism(1);
env.setBufferTimeout(BUFFER_TIMEOUT_MS);
env.enableCheckpointing(CHECKPOINT_INTERVAL_MS, CheckpointingMode.AT_LEAST_ONCE);
env.getCheckpointConfig().setCheckpointTimeout(CHECKPOINT_TIMEOUT_MS);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(CHECKPOINT_MIN_PAUSE_MS);
env.setRestartStrategy(RestartStrategies.failureRateRestart(3, Time.minutes(1), Time.seconds(10)));
//2.通过FlinkCDC构建SourceFunction
SourceFunction<String> mongoDBSourceFunction = MongoDBSource.<String>builder()
.hosts(MONGODB_URL)
.username(MONGODB_USER)
.password(MONGODB_PWD)
.databaseList(MONGODB_DATABASE)
.collectionList(MONGODB_COLLECTION)
.deserializer(new JsonDebeziumDeserializationSchema())
// .deserializer(new CoustomParse())
.build();
//3.数据初步处理,因为es的keyword最大不能超过32766
SingleOutputStreamOperator<String> stream = env.addSource(mongoDBSourceFunction)
.setParallelism(1)
.name("mongo_to_es")
.filter(new FilterFunction<String>()
@Override
public boolean filter(String s) throws Exception
try
//判断是否是json格式,不是过滤掉
JSONObject obj = JSON.parseObject(s);
return true;
catch (Exception e)
System.out.println("json格式错误:"+ s) ;
return false;
//不处理会报whose UTF8 encoding is longer than the max length 32766,将过大的字段过滤掉
).map(new MapFunction<String, String>()
@Override
public String map(String s) throws Exception
JSONObject obj = JSON.parseObject(s);
String str = obj.getString("operationType");
if("insert".equals(str) || "update".equals(str))
JSONObject obj1 = obj.getJSONObject("fullDocument");
if(obj1.toString().getBytes("utf-8").length > 36000)
Set<Map.Entry<String, Object>> entries = obj1.entrySet();
Iterator<Map.Entry<String, Object>> iterator = entries.iterator();
while(iterator.hasNext())
// String s1 = iterator.next().toString();
// System.out.println("iterator含义:" + s1);
if(iterator.next().toString().getBytes("utf-8").length > 30000)
iterator.remove();
obj.fluentPut("fullDocument",obj1.toString());
return obj.toString();
);
List<HttpHost> httpHosts = new ArrayList<>();
//4.对insert/update/delete分别处理
httpHosts.add(new HttpHost(ES_URL, ES_PORT, "http"));
ElasticsearchSink.Builder<String> esSinkBuilder = new ElasticsearchSink.Builder<String>(
httpHosts, new ElasticsearchSinkFunction<String>()
public ActionRequest createIndexRequest(String element)
JSONObject obj = JSON.parseObject(element);
// System.out.println("create:" + obj.toString());
String str = obj.getString("operationType");
// Map<String, String> json = new HashMap<>();
String id = null;
try
id = obj.getJSONObject("documentKey").getJSONObject("_id").getString("$oid");
catch (Exception e)
try
id = obj.getJSONObject("documentKey").getString("_id");
catch (Exception ex)
System.out.println("格式不对:" + obj);
以上是关于FlinkCDC从Mongodb同步数据至elasticsearch(ES) 新版的主要内容,如果未能解决你的问题,请参考以下文章
Flink CDC 2.1 正式发布,稳定性大幅提升,新增 Oracle,MongoDB 支持
Flink 实战系列Flink CDC 实时同步 Mysql 全量加增量数据到 Hudi
Flink 实战系列Flink CDC 实时同步 Mysql 全量加增量数据到 Hudi