是否可以使用火花流传输数据库表数据

Posted

技术标签:

【中文标题】是否可以使用火花流传输数据库表数据【英文标题】:is it possible to stream a database table data using spark streaming 【发布时间】:2020-05-07 09:48:22 【问题描述】:

尝试流式传输 SQLServer 表数据。因此,创建了一个带有主类的简单 java 程序。创建了一个 sparkconf 并使用它,启动了一个 JavaStreamingContext 并从中检索 SparkContext。使用 Spark API 的 JdbcRDD 和 JavaRDD 从数据库接收数据并启动一个 inputQueue,然后准备 JavaInputDStream。所以完成了先决条件并启动了JavaStreamingContext。因此,我正在获取我在准备 inputQueue 时收到的第一组数据,但没有获取更多流的数据。

package com.ApacheSparkConnection.ApacheSparkConnection;

import java.io.Serializable;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Queue;

import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.rdd.JdbcRDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.StreamingContext;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;

import com.fasterxml.jackson.databind.deser.std.StringDeserializer;
import com.infosys.himi.maskit.algorithms.encryptiondecryption.EncryptionARC4;
import com.infosys.maskit.common.util.ConfigParams;

import scala.Tuple2;
import scala.reflect.ClassManifestFactory$;
import scala.runtime.AbstractFunction0;
import scala.runtime.AbstractFunction1;

public class MainSparkConnector 

    public static void main(String[] args) throws Exception 

        String dbtableQuery = "SELECT TOP 10 AGENT_CODE,AGENT_NAME,WORKING_AREA,COMMISSION,PHONE_NO,COUNTRY FROM dbo.AGENTS where AGENT_CODE >= ? and AGENT_CODE <= ?";

        String host = "XXXXXXXXX";
        String databaseName = "YYYY";
        String user = "sa";
        String password = "XXXXXX@123";

        long previewSize = 0; 

        Instant start = Instant.now();

        SparkConf sparkConf = new SparkConf().setAppName("SparkJdbcDs")
                .setMaster("local[4]")
                .set("spark.driver.allowMultipleContexts", "true");

        JavaStreamingContext javaStreamingContext = new JavaStreamingContext(sparkConf, Durations.seconds(10));
        JavaSparkContext javaSparkContext  =  javaStreamingContext.sparkContext();
        SparkContext sparkContext = javaSparkContext.sc(); 

        String url = "jdbc:sqlserver://" + host + ":1433;databaseName=" + databaseName;
        String driver = "com.microsoft.sqlserver.jdbc.SQLServerDriver"; 

        DbConnection dbConnection = new DbConnection(driver, url, user, password);

        JdbcRDD<Object[]> jdbcRDD =
                new JdbcRDD<Object[]>(sparkContext, dbConnection, dbtableQuery, 0,
                              100000, 10, new MapResult(), ClassManifestFactory$.MODULE$.fromClass(Object[].class));

        JavaRDD<Object[]> javaRDD = JavaRDD.fromRDD(jdbcRDD, ClassManifestFactory$.MODULE$.fromClass(Object[].class));

        List<String> employeeFullNameList = javaRDD.map(new Function<Object[], String>() 
            @Override
            public String call(final Object[] record) throws Exception 
                String rec = "";
                for(Object ob : record) 
                    rec = rec + " " + ob;
                
                return rec;
            
        ).collect();

        JavaRDD<String> javaRDD1 = javaStreamingContext.sparkContext().parallelize(employeeFullNameList);
        Queue<JavaRDD<String>> inputQueue = new LinkedList<JavaRDD<String>>();

        inputQueue.add(javaRDD1);

        JavaInputDStream<String> javaDStream = javaStreamingContext.queueStream(inputQueue, true);
        System.out.println("javaDStream.print()");
        javaDStream.print();
        javaDStream.foreachRDD( rdd-> 
            System.out.println("rdd.count() : "+ rdd.count());
            rdd.collect().stream().forEach(n-> System.out.println("item of list: "+n));
        );
        javaStreamingContext.start();

        System.out.println("employeeFullNameList.size() : "+employeeFullNameList.size());

        javaStreamingContext.awaitTermination();
    

    static class DbConnection extends AbstractFunction0<Connection> implements Serializable 

        private String driverClassName;
        private String connectionUrl;
        private String userName;
        private String password;

        public DbConnection(String driverClassName, String connectionUrl, String userName, String password) 
            this.driverClassName = driverClassName;
            this.connectionUrl = connectionUrl;
            this.userName = userName;
            this.password = password;
        

        public Connection apply() 
            try 
                Class.forName(driverClassName);
             catch (ClassNotFoundException e) 
                System.out.println("Failed to load driver class" +e);
            

            Properties properties = new Properties();
            properties.setProperty("user", userName);
            properties.setProperty("password", password);

            Connection connection = null;
            try 
                connection = DriverManager.getConnection(connectionUrl, properties);
             catch (SQLException e) 
                System.out.println("Connection failed"+ e);
            

            return connection;
        
    

    static class MapResult extends AbstractFunction1<ResultSet, Object[]> implements Serializable 

        public Object[] apply(ResultSet row) 
            return JdbcRDD.resultSetToObjectArray(row);
        
    
````
Please let me know if am in wrong direction

【问题讨论】:

您是否还希望在 Spark 流式传输作业启动后流式传输对数据库所做的后续更改? 希望在 spark 流式传输作业开始后再次流式传输整个数据,因此如果有任何更改,那么无论如何它必须与整个数据一起显示。 【参考方案1】:

通过 Spark Streaming 流式处理 RDBMS 对初始数据的快照很容易,但没有直接的方法可以获取数据库中发生的尾随变化。

更好的解决方案是通过 Debezium SQL Server 连接器

Debezium 的 SQL Server 连接器可以监控和记录行级 SQL Server 数据库架构的变化。

您需要设置一个 Kafka 集群 为 SQL 服务器启用 CDC

SQL Server CDC 并非旨在存储数据库更改的完整历史记录。因此,Debezium 有必要建立当前数据库内容的基线并将其流式传输到 Kafka。这是通过称为快照的过程实现的。

默认情况下(快照模式初始),连接器将在第一次启动时执行数据库的初始一致快照(这意味着要根据连接器的过滤器配置捕获任何表中的结构和数据)。

每个快照都包含以下步骤:

Determine the tables to be captured

Obtain a lock on each of the monitored tables to ensure that no structural changes can occur to any of the tables. The level of the lock is determined by snapshot.isolation.mode configuration option.

Read the maximum LSN ("log sequence number") position in the server’s transaction log.

Capture the structure of all relevant tables.

Optionally release the locks obtained in step 2, i.e. the locks are held usually only for a short period of time.

Scan all of the relevant database tables and schemas as valid at the LSN position read in step 3, and generate a READ event for each row and write that event to the appropriate table-specific Kafka topic.

Record the successful completion of the snapshot in the connector offsets.

读取变更数据表

在第一次启动时,连接器会对捕获的表的结构进行结构快照,并将此信息保存在其内部数据库历史主题中。然后连接器为每个源表识别一个更改表并执行主循环

For each change table read all changes that were created between last stored maximum LSN and current maximum LSN

Order the read changes incrementally according to commit LSN and change LSN. This ensures that the changes are replayed by Debezium in the same order as were made to the database.

Pass commit and change LSNs as offsets to Kafka Connect.

Store the maximum LSN and repeat the loop.

重新启动后,连接器将从之前停止的偏移处恢复(提交和更改 LSN)。

连接器能够在运行时检测是否为列入白名单的源表启用或禁用了 CDC,并修改其行为。

SQL Server 连接器将单个表上的所有插入、更新和删除操作的事件写入单个 Kafka 主题。 Kafka 主题的名称始终采用 serverName.schemaName.tableName 的形式,其中 serverName 是使用 database.server.name 配置属性指定的连接器的逻辑名称,schemaName 是发生操作的模式的名称,并且tableName 是发生操作的数据库表的名称。

例如,假设 SQL Server 安装具有包含四个表的清单数据库:products, products_on_hand, customers, and orders 架构 dbo。如果监控这个数据库的连接器被赋予了一个执行的逻辑服务器名称,那么连接器将在这四个 Kafka 主题上产生事件:

    fulfillment.dbo.products
    fulfillment.dbo.products_on_hand
    fulfillment.dbo.customers

    fulfillment.dbo.orders

【讨论】:

感谢您的回答,但此要求不仅限于 SQLServer,还必须处理数十亿条记录。那么还有其他最适合这个用例的方法吗? Debezium 有几乎所有数据库的连接器。如果您想实时流式传输数十亿条记录,然后是数据库更改,那么我只能建议使用 Debezium + kafka + Spark。 如果你能提供java中的任何示例链接,如果你有方便的话,那将非常有帮助? debezium.io/documentation/reference/1.1/connectors/… 更多的是在基础设施方面,安装和使用现有工具,以解决我们数据工程师不时面临的琐碎问题

以上是关于是否可以使用火花流传输数据库表数据的主要内容,如果未能解决你的问题,请参考以下文章

如何将火花流数据帧存储到 Mysql 表。?

替代递归运行Spark-submit作业

使用火花流解析事件中心消息上的 JSON

火花流到pyspark json文件中的数据帧

什么是数据块火花增量表?他们是不是还存储特定会话的数据以及如何查看这些增量表及其结构

是否可以使用 Akka Stream 从数据库表中创建“无限”流