FlinkCDC从Mongodb同步数据至elasticsearch(ES) 新版

Posted 极之夜

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了FlinkCDC从Mongodb同步数据至elasticsearch(ES) 新版相关的知识,希望对你有一定的参考价值。

FlinkCDC从Mongodb同步数据至elasticsearch(ES)

DataStreamingAPI方式

网上挺多flinksql方式同步数据,但是遇到数据比较杂乱,会经常无缘无故报错,笔者被逼无奈,采用API方式处理数据后同步,不知为何API资料笔者找到的资料很少,还很不全,摸着石头过河总算完成任务,收获颇丰,以此分享给大家。

pom.xml

  <modelVersion>4.0.0</modelVersion>

    <groupId>com.cece</groupId>
    <artifactId>Mongo-ES</artifactId>
    <version>1.0</version>

    <properties>
        <java.version>1.8</java.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>

        <maven.compiler.source>$java.version</maven.compiler.source>
        <maven.compiler.target>$java.version</maven.compiler.target>
        <flink.version>1.13.0</flink.version>
        <scala.binary.version>2.12</scala.binary.version>
        <scala.version>2.12</scala.version>
    </properties>
    <dependencies>







        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>$flink.version</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.12</artifactId>
            <version>$flink.version</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.12</artifactId>
            <version>$flink.version</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge_2.12</artifactId>
            <version>$flink.version</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_$scala.binary.version</artifactId>
            <version>$flink.version</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-json</artifactId>
            <version>$flink.version</version>
        </dependency>






        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.68</version>
        </dependency>

        <dependency>
            <groupId>com.google.code.gson</groupId>
            <artifactId>gson</artifactId>
            <version>2.8.3</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-elasticsearch7_2.11</artifactId>
            <version>$flink.version</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_$scala.version</artifactId>
            <version>$flink.version</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.25</version>
        </dependency>

        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.25</version>
        </dependency>

        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-to-slf4j</artifactId>
            <version>2.14.0</version>
        </dependency>

        <dependency>
            <groupId>org.mongodb</groupId>
            <artifactId>mongo-java-driver</artifactId>
            <version>3.11.0</version>
        </dependency>




        <dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-connector-mongodb-cdc</artifactId>
            <version>2.2.0</version>
        </dependency>

        <dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-sql-connector-mongodb-cdc</artifactId>
            <version>2.2.0</version>
        </dependency>

<!--         https://mvnrepository.com/artifact/org.apache.kafka/connect-api -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>connect-api</artifactId>
            <version>2.7.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.7.0</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-jdbc -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-jdbc_2.12</artifactId>
            <version>$flink.version</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.1.1</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <artifactSet>
                                <excludes>
                                    <exclude>com.google.code.findbugs:jsr305</exclude>
                                    <exclude>org.slf4j:*</exclude>
                                    <exclude>log4j:*</exclude>
                                    <exclude>org.apache.hadoop:*</exclude>
                                </excludes>
                            </artifactSet>
                            <filters>
                                <filter>
                                    <!-- Do not copy the signatures in the META-INF folder.Otherwise, this might cause SecurityExceptions when using the JAR. -->
                                    <!-- 打包时不复制META-INF下的签名文件,避免报非法签名文件的SecurityExceptions异常-->
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>

                            <transformers combine.children="append">
                                <!-- The service transformer is needed to merge META-INF/services files -->
                                <!-- connector和format依赖的工厂类打包时会相互覆盖,需要使用ServicesResourceTransformer解决-->
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>

    </build>

主程序

public class Config 
    public static final String MONGODB_URL = "1xxx";
    public static final String MONGODB_USER = "sxx";
    public static final String MONGODB_PWD = "xx";



    public static final String MONGODB_DATABASE = "xx";
    public static final String MONGODB_COLLECTION = "xx";



    public static final String ES_URL = "xx";
    public static final int ES_PORT = xx;
    public static final String ES_USER = "xxx";
    public static final String ES_PWD = "xxxx";
    public static final String ES_INDEX = "xxxx";



    public static final int   BUFFER_TIMEOUT_MS =100;
    public static final int   CHECKPOINT_INTERVAL_MS =3*60*1000;
    public static final int   CHECKPOINT_TIMEOUT_MS = 1*60*1000;
    public static final int   CHECKPOINT_MIN_PAUSE_MS = 1*60*1000;

主程序

public class FlinkCdcSyn_API 


    public static void main(String[] args) throws Exception 
    //1.构建flink环境及配置checkpoint
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//        env.setParallelism(1);
        env.setBufferTimeout(BUFFER_TIMEOUT_MS);
        env.enableCheckpointing(CHECKPOINT_INTERVAL_MS, CheckpointingMode.AT_LEAST_ONCE);
        env.getCheckpointConfig().setCheckpointTimeout(CHECKPOINT_TIMEOUT_MS);
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(CHECKPOINT_MIN_PAUSE_MS);
        env.setRestartStrategy(RestartStrategies.failureRateRestart(3, Time.minutes(1), Time.seconds(10)));

        //2.通过FlinkCDC构建SourceFunction
        SourceFunction<String> mongoDBSourceFunction = MongoDBSource.<String>builder()
                .hosts(MONGODB_URL)
                .username(MONGODB_USER)
                .password(MONGODB_PWD)

                .databaseList(MONGODB_DATABASE)
                .collectionList(MONGODB_COLLECTION)

                .deserializer(new JsonDebeziumDeserializationSchema())
//                .deserializer(new CoustomParse())
                .build();

//3.数据初步处理,因为es的keyword最大不能超过32766

        SingleOutputStreamOperator<String> stream = env.addSource(mongoDBSourceFunction)
                .setParallelism(1)
                .name("mongo_to_es")
                .filter(new FilterFunction<String>() 
                    @Override
                    public boolean filter(String s) throws Exception 
                        try 
                        //判断是否是json格式,不是过滤掉
                            JSONObject obj = JSON.parseObject(s);
                            return true;
                         catch (Exception e) 
                            System.out.println("json格式错误:"+ s) ;
                            return  false;
                        
                    
                    //不处理会报whose UTF8 encoding is longer than the max length 32766,将过大的字段过滤掉
                ).map(new MapFunction<String, String>() 
                    @Override
                    public String map(String s) throws Exception 
                        JSONObject obj = JSON.parseObject(s);
                        String str = obj.getString("operationType");

                        if("insert".equals(str) || "update".equals(str))
                            JSONObject obj1 = obj.getJSONObject("fullDocument");
                            if(obj1.toString().getBytes("utf-8").length > 36000)


                            Set<Map.Entry<String, Object>> entries = obj1.entrySet();
                            Iterator<Map.Entry<String, Object>> iterator = entries.iterator();

                            while(iterator.hasNext())
//                                    String s1 = iterator.next().toString();
//                                    System.out.println("iterator含义:" + s1);
                                if(iterator.next().toString().getBytes("utf-8").length > 30000) 
                                    iterator.remove();
                                
                            
                            obj.fluentPut("fullDocument",obj1.toString());
                        
                        

                        return obj.toString();
                    
                );
                  List<HttpHost> httpHosts = new ArrayList<>();


//4.对insert/update/delete分别处理
        httpHosts.add(new HttpHost(ES_URL, ES_PORT, "http"));
           ElasticsearchSink.Builder<String> esSinkBuilder = new ElasticsearchSink.Builder<String>(
                httpHosts, new ElasticsearchSinkFunction<String>() 

            public ActionRequest createIndexRequest(String element) 



               JSONObject obj = JSON.parseObject(element);
              //  System.out.println("create:" + obj.toString());
              String str = obj.getString("operationType");
               // Map<String, String> json = new HashMap<>();
                String id = null;

                try 
                    id = obj.getJSONObject("documentKey").getJSONObject("_id").getString("$oid");
                 catch (Exception e) 
                    try 
                        id = obj.getJSONObject("documentKey").getString("_id");
                     catch (Exception ex) 
                        System.out.println("格式不对:" + obj);
                

以上是关于FlinkCDC从Mongodb同步数据至elasticsearch(ES) 新版的主要内容,如果未能解决你的问题,请参考以下文章

DBSync新增对MongoDBES的支持

Flink CDC 2.1 正式发布,稳定性大幅提升,新增 Oracle,MongoDB 支持

Flink 实战系列Flink CDC 实时同步 Mysql 全量加增量数据到 Hudi

Flink 实战系列Flink CDC 实时同步 Mysql 全量加增量数据到 Hudi

Flink 版本数据湖(hudi)实时数仓---flinkcdc hudi kafak hive

TIDB - 使用 TiDB Binlog 将日志同步至下游 Kafka 中