在 Spark 中,将数据集写入数据库时​​,保存操作需要一些预先假定的时间

Posted

技术标签:

【中文标题】在 Spark 中,将数据集写入数据库时​​,保存操作需要一些预先假定的时间【英文标题】:In Spark,while writing dataset into database it takes some pre-assumed time for save operation 【发布时间】:2018-08-23 07:09:41 【问题描述】:

我运行了如下所述的 spark-submit 命令,它执行从 DB 加载数据集、处理,并在最后阶段将多个数据集推送到 Oracle DB。

./spark-submit --class com.sample.Transformation --conf spark.sql.shuffle.partitions=5001 
    --num-executors=40 --executor-cores=1 --executor-memory=5G 
    --jars /scratch/rmbbuild/spark_ormb/drools-jars/ojdbc6.jar,
        /scratch/rmbbuild/spark_ormb/drools-jars/kie-api-7.7.0.Final.jar,
        /scratch/rmbbuild/spark_ormb/drools-jars/drools-core-7.7.0.Final.jar,
        /scratch/rmbbuild/spark_ormb/drools-jars/drools-compiler-7.7.0.Final.jar,
        /scratch/rmbbuild/spark_ormb/drools-jars/kie-soup-maven-support-7.7.0.Final.jar,
        /scratch/rmbbuild/spark_ormb/drools-jars/kie-internal-7.7.0.Final.jar,
        /scratch/rmbbuild/spark_ormb/drools-jars/xstream-1.4.10.jar,
        /scratch/rmbbuild/spark_ormb/drools-jars/kie-soup-commons-7.7.0.Final.jar,
        /scratch/rmbbuild/spark_ormb/drools-jars/ecj-4.4.2.jar,
        /scratch/rmbbuild/spark_ormb/drools-jars/mvel2-2.4.0.Final.jar,
        /scratch/rmbbuild/spark_ormb/drools-jars/kie-soup-project-datamodel-commons-7.7.0.Final.jar,
        /scratch/rmbbuild/spark_ormb/drools-jars/kie-soup-project-datamodel-api-7.7.0.Final.jar 
    --driver-class-path /scratch/rmbbuild/spark_ormb/drools-jars/ojdbc6.jar 
    --master spark://10.180.181.41:7077 "/scratch/rmbbuild/spark_ormb/POC-jar/Transformation-0.0.1-SNAPSHOT.jar" 
        > /scratch/rmbbuild/spark_ormb/POC-jar/logs/logs12.txt

但是,在将数据集写入数据库时​​需要一些预先假定的时间,不知道为什么在开始写入过程之前会花费这么长时间。 附上清晰突出我面临的问题的屏幕截图。 在注释掉解决方案之前,请先浏览屏幕截图。 Spark 仪表板阶段屏幕截图:

如果我们看一下屏幕截图,我强调了大约 10 分钟的时间,这是在每个数据集写入数据库之前消耗的。 即使我将批量大小更改为 100000,如下所示:

outputDataSetforsummary.write().mode("append").format("jdbc").option("url", connection)
    .option("batchSize", "100000").option("dbtable", CI_TXN_DTL).save();

所以,如果有人能解释为什么每次都消耗这个预写时间,以及如何避免这个时间。

我附上程序的更多描述代码。

   public static void main(String[] args) 

        SparkConf conf = new
            //  SparkConf().setAppName("Transformation").setMaster("local");
        SparkConf().setAppName("Transformation").setMaster("spark://xx.xx.xx.xx:7077");
        String connection = "jdbc:oracle:thin:ABC/abc@//xx.x.x.x:1521/ABC";
    
        // Create Spark Context
        SparkContext context = new SparkContext(conf);
        // Create Spark Session
        SparkSession sparkSession = new SparkSession(context);
        Dataset<Row> txnDf  = sparkSession.read().format("jdbc").option("url", connection).option("dbtable", CI_TXN_DETAIL_STG).load();
        //Dataset<Row> txnDf  = sparkSession.read().format("jdbc").option("url", connection).option("dbtable", "CI_TXN_DETAIL_STG").load();
        Dataset<Row> newTxnDf  = txnDf.drop(ACCT_ID);
        
        Dataset<Row> accountDf = sparkSession.read().format("jdbc").option("url", connection).option("dbtable", CI_ACCT_NBR).load();
        //  Dataset<Row> accountDf = sparkSession.read().format("jdbc").option("url", connection).option("dbtable", "CI_ACCT_NBR").load();

        Dataset<Row> joined = newTxnDf.join(accountDf, newTxnDf.col(ACCT_NBR).equalTo(accountDf.col(ACCT_NBR))
                .and(newTxnDf.col(ACCT_NBR_TYPE_CD).equalTo(accountDf.col(ACCT_NBR_TYPE_CD))), "inner");
        Dataset<Row> finalJoined = joined.drop(accountDf.col(ACCT_NBR_TYPE_CD)).drop(accountDf.col(ACCT_NBR))
                .drop(accountDf.col(VERSION)).drop(accountDf.col(PRIM_SW));

        
        initializeProductDerivationCache(sparkSession,connection);
      
        
        ClassTag<List<String>> evidenceForDivision = scala.reflect.ClassTag$.MODULE$.apply(List.class);
        Broadcast<List<String>> broadcastVarForDiv = context.broadcast(divisionList, evidenceForDivision);
        
        ClassTag<List<String>> evidenceForCurrency = scala.reflect.ClassTag$.MODULE$.apply(List.class);
        Broadcast<List<String>> broadcastVarForCurrency = context.broadcast(currencySet, evidenceForCurrency);
        
        ClassTag<List<String>> evidenceForUserID = scala.reflect.ClassTag$.MODULE$.apply(List.class);
        Broadcast<List<String>> broadcastVarForUserID = context.broadcast(userIdList, evidenceForUserID);
        
    
        
        Encoder<RuleParamsBean> encoder = Encoders.bean(RuleParamsBean.class);
        Dataset<RuleParamsBean> ds = new Dataset<RuleParamsBean>(sparkSession, finalJoined.logicalPlan(), encoder);
        Dataset<RuleParamsBean> validateDataset = ds.map(ruleParamsBean -> validateTransaction(ruleParamsBean,broadcastVarForDiv.value(),broadcastVarForCurrency.value(),
                                                            broadcastVarForUserID.value()),encoder);
        
    
        Dataset<RuleParamsBean> filteredDS = validateDataset.filter(validateDataset.col(BO_STATUS_CD).notEqual(TFMAppConstants.TXN_INVALID));
        //For formatting the data to be inserted in table -->   Dataset<Row>finalvalidateDataset = validateDataset.select("ACCT_ID");
        

    
        Encoder<TxnDetailOutput>txndetailencoder = Encoders.bean(TxnDetailOutput.class);
        Dataset<TxnDetailOutput>txndetailDS =validateDataset.map(ruleParamsBean ->outputfortxndetail(ruleParamsBean),txndetailencoder );
        
    
        
        
        KieServices ks = KieServices.Factory.get();
        KieContainer kContainer = ks.getKieClasspathContainer();
        ClassTag<KieBase> classTagTest = scala.reflect.ClassTag$.MODULE$.apply(KieBase.class);
        Broadcast<KieBase> broadcastRules = context.broadcast(kContainer.getKieBase(KIE_BASE), classTagTest);

        Encoder<PritmRuleOutput> outputEncoder = Encoders.bean(PritmRuleOutput.class);
        Dataset<PritmRuleOutput> outputDataSet = filteredDS.flatMap(rulesParamBean -> droolprocesMap(broadcastRules.value(), rulesParamBean), outputEncoder);

        Dataset<Row>piParamDS1 =outputDataSet.select(PRICEITEM_PARM_GRP_VAL);
        Dataset<Row> piParamDS = piParamDS1.withColumnRenamed(PRICEITEM_PARM_GRP_VAL, PARM_STR);

        priceItemParamGrpValueCache.createOrReplaceTempView("temp1");
        Dataset<Row>piParamDSS = piParamDS.where(queryToFiltertheDuplicateParamVal);
        Dataset<Row> priceItemParamsGrpDS = piParamDSS.select(PARM_STR).distinct().withColumn(PRICEITEM_PARM_GRP_ID, functions.monotonically_increasing_id());
        Dataset<Row> finalpriceItemParamsGrpDS = priceItemParamsGrpDS.withColumn(PARM_COUNT, functions.size(functions.split(priceItemParamsGrpDS.col(PARM_STR),TOKENIZER)));
        finalpriceItemParamsGrpDS.persist(StorageLevel.MEMORY_ONLY());
        finalpriceItemParamsGrpDS.distinct().write().mode("append").format("jdbc").option("url", connection).option("dbtable", CI_PRICEITEM_PARM_GRP_K).option("batchSize", "1000").save();

            
        
        Dataset<Row> PritmOutput = outputDataSet.join(priceItemParamsGrpDS,outputDataSet.col(PRICEITEM_PARM_GRP_VAL).equalTo(priceItemParamsGrpDS.col(PARM_STR)),"inner");
        Dataset<Row> samplePritmOutput = PritmOutput.drop(outputDataSet.col(PRICEITEM_PARM_GRP_ID))
                .drop(priceItemParamsGrpDS.col(PARM_STR));

        priceItemParamsGrpDS.createOrReplaceTempView(PARM_STR);
        Dataset<Row> priceItemParamsGroupTable =sparkSession.sql(FETCH_QUERY_TO_SPLIT);
        Dataset<Row> finalpriceItemParamsGroupTable = priceItemParamsGroupTable.selectExpr("PRICEITEM_PARM_GRP_ID","split(col, '=')[0] as PRICEITEM_PARM_CD ","split(col, '=')[1] as PRICEITEM_PARM_VAL");
        finalpriceItemParamsGroupTable.persist(StorageLevel.MEMORY_ONLY());
        finalpriceItemParamsGroupTable.distinct().write().mode("append").format("jdbc").option("url", connection).option("dbtable", CI_PRICEITEM_PARM_GRP).option("batchSize", "1000").save();

【问题讨论】:

根据您的机器配置增加内核数,更新应该在多个内核中的分区数 由于缺少以下信息,无法解决您的问题:+ 输入大小。 + 你在做什么计算。 + 你的火花配置。但是让我们谈谈我从屏幕截图中看到的内容: + 看起来你有 3 个动作。因此,您应该考虑使用持久解决方案来避免在每个操作中重新加载数据库中的数据。 (在内存或磁盘/两者上)+我可以看到你在真正做一些繁重的计算任务之前调用了persist。 @tauitdnmd 您需要什么样的信息,请您详细说明一下?我正在将DB中的10M数据加载到数据集中,将1M数据加载到另一个数据集中,然后执行各种地图,平面图操作。最后在完成上述操作后将数据集推送到数据库中。如果需要更多信息,请在下面评论。跨度> 正如我所问的:你是在调用 3 个动作吗?对?根据您的输入大小(总共 11 MB),它真的很小。但我可以看到 shuffle 读取 ~ 1GB。那么你能把你的代码贴在这里吗? 在第 7 阶段调用 'save' 之后。你是否重新使用数据框进行其他计算?如果是,你是否应该考虑坚持下去。但我相信你设置的分区数太高了。让我们先尝试 200,然后再增加一点 【参考方案1】:

它会在每个write to db 操作中一次又一次地重新加载整个数据并加入数据帧。

请添加validateDataset.persist(StorageLevel.MEMORY_ONLY()) - (您应该根据您的数据框大小自行考虑在内存或磁盘上或在mem_and_disk上。它是否适合内存)

例如:

 Dataset<RuleParamsBean> validateDataset = ds.map(ruleParamsBean -> validateTransaction(ruleParamsBean,broadcastVarForDiv.value(),broadcastVarForCurrency.value(),broadcastVarForUserID.value()),encoder)
.persist(StorageLevel.MEMORY_ONLY());

【讨论】:

这个参数应该设置成什么? ./spark-submit --class com.sample.Transformation --conf spark.sql.shuffle.partitions=5001 --num-executors=40 --executor-cores=1 --executor-memory=5G ...作为我有两台 88 核 Worker 机器,512 GB 内存和 1 台 Master of 4cores 和 8 GB 内存。你能评论一下这个值吗

以上是关于在 Spark 中,将数据集写入数据库时​​,保存操作需要一些预先假定的时间的主要内容,如果未能解决你的问题,请参考以下文章

无法使用 jdbc 将 spark 数据集写入数据库

如何在写入时强制数据集匹配其模式?

从db中提取多列数据并使用spark写入文件?

使用 phoenix 连接器将 Spark 数据帧写入 Hbase

Spark Distinct算子写入MySql TopN 性能分析

将 Spark 数据集转换为 JSON 并写入 Kafka Producer