从db中提取多列数据并使用spark写入文件?

Posted

技术标签:

【中文标题】从db中提取多列数据并使用spark写入文件?【英文标题】:extract multiple columns data from db and write to a file using spark? 【发布时间】:2015-12-06 01:37:19 【问题描述】:

下面的火花代码垂直而不是水平保存数据有人可以帮忙吗? 如何将结果集输出保存到 spark 中的文件?用户名、密码、dburl 来自 spring 框架配置值。

例如:- 1、 2、 3

预期:- 1,2,3

package com.kali.db

/**
 * Created by kalit_000 on 05/12/2015.
 */

import java.util

import org.apache.spark.SparkConf
import org.apache.log4j.Logger
import org.apache.log4j.Level
import org.apache.spark._
import java.sql.ResultSet, DriverManager, Connection
import org.apache.spark.rdd.JdbcRDD, RDD
import org.springframework.context.support.ClassPathXmlApplicationContext
import scala.collection.mutable.ListBuffer

case class SqlMulti(driver:String,url:String,username:String,password:String,sql:String)

object SqlMultiExample 

  def main (args: Array[String]):Unit= 

    Logger.getLogger("org").setLevel(Level.WARN)
    Logger.getLogger("akka").setLevel(Level.WARN)

    val conf = new SparkConf().setMaster("local[1]").setAppName("MultipleSqlColumns").set("spark.hadoop.validateOutputSpecs", "false")
    val sc = new SparkContext(conf)

    //read the application context file
    val ctx = new ClassPathXmlApplicationContext("multiplecolumns.xml")
    val DBinfo = ctx.getBean("SqlTest").asInstanceOf[SqlMulti]

    /*assign class values to variables*/
    val driver = DBinfo.driver
    val url = DBinfo.url
    val username = DBinfo.username
    val password = DBinfo.password
    val query = DBinfo.sql
    var connection: Connection = null
    val sqlquery = DBinfo.sql

    println("DB Driver:-%s".format(driver))
    println("DB Url:-%s".format(url))
    println("Username:-%s".format(username))
    println("Password:-%s".format(password))
    println("Query:-%s".format(query))


    try 
      Class.forName(driver)
      connection = DriverManager.getConnection(url, username, password)
      val statement = connection.createStatement()
      val resultSet = statement.executeQuery(query)

      resultSet.setFetchSize(10);
      val columnnumber = resultSet.getMetaData().getColumnCount.toInt

      /*OP COLUMN NAMES*/
      var i = 0.toInt;
      for (i <- 1 to columnnumber.toInt) 
        val columnname = resultSet.getMetaData().getColumnName(i)
        println("Column Names are:- %s".format(columnname))
      


      /*OP DATA*/
      while (resultSet.next()) 
        var list = new java.util.ArrayList[String]()
        for (i <- 1 to columnnumber.toInt) 
          list.add(resultSet.getString(i))
          //println(list)
          sc.parallelize(list.toString.replace("null", "N/A")).saveAsTextFile("C:\\Users\\kalit_000\\Desktop\\typesafe\\scaladbop\\op.txt")
        
      

      

     catch 
      case e: Exception => e.printStackTrace
    
    connection.close()
    sc.stop()
  

我重新编写了代码以使用 read.jdbc 这已经解决了我的所有要求

package com.kali.db

/**
 * Created by kalit_000 on 06/12/2015.
 */

import java.util.Properties
import org.apache.spark.SparkConf
import org.apache.log4j.Logger
import org.apache.log4j.Level
import org.apache.spark._
import org.apache.spark.rdd.JdbcRDD, RDD
import org.apache.spark.sql.DataFrame
import org.springframework.context.support.ClassPathXmlApplicationContext

case class SparkSqlValueClassMPP(driver:String,url:String,username:String,password:String,table:String,opdelimeter:String,lowerbound:String,upperbound:String,numberofparitions:String,parallelizecolumn:String)

object SparkDBExtractorMPP 

  def main (args: Array[String]) 

    Logger.getLogger("org").setLevel(Level.WARN)
    Logger.getLogger("akka").setLevel(Level.WARN)

    val conf = new SparkConf().setMaster("local[*]").setAppName("SparkDBExtractorMPP").set("spark.hadoop.validateOutputSpecs", "false")
    val sc = new SparkContext(conf)

    def opfile(value:DataFrame,delimeter:String):RDD[String]=
    
      value.map(x => x.toString.replace("[","").replace("]","").replace(",",delimeter))
    

    //read the application context file
    val ctx = new ClassPathXmlApplicationContext("sparkDBExtractorMpp.xml")
    val DBinfo = ctx.getBean("SparkSQLDBExtractorMPP").asInstanceOf[SparkSqlValueClassMPP]

    val driver = DBinfo.driver
    val url = DBinfo.url
    val username = DBinfo.username
    val password = DBinfo.password
    val table = DBinfo.table
    val opdelimeter=DBinfo.opdelimeter
    val lowerbound=DBinfo.lowerbound.toInt
    val upperbound=DBinfo.upperbound.toInt
    val numberofpartitions=DBinfo.numberofparitions.toInt
    val parallelizecolumn=DBinfo.parallelizecolumn


    println("DB Driver:-%s".format(driver))
    println("DB Url:-%s".format(url))
    println("Username:-%s".format(username))
    println("Password:-%s".format(password))
    println("Table:-%s".format(table))
    println("Opdelimeter:-%s".format(opdelimeter))
    println("Lowerbound:-%s".format(lowerbound))
    println("Upperbound:-%s".format(upperbound))
    println("Numberofpartitions:-%s".format(numberofpartitions))
    println("Parallelizecolumn:-%s".format(parallelizecolumn))

    try 
    val props=new Properties()
    props.put("user",username)
    props.put("password",password)
    props.put("driver",driver)

    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
    val df = sqlContext.read.jdbc(url,table,parallelizecolumn,lowerbound,upperbound,numberofpartitions,props)

    df.show(10)

    opfile(df,opdelimeter).saveAsTextFile("C:\\Users\\kalit_000\\Desktop\\typesafe\\scaladbop\\op.txt")

     catch 
      case e: Exception => e.printStackTrace
    
    sc.stop()
  

为了使这段代码高度可配置,我使用 Java spring 框架

<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE beans PUBLIC "-//SPRING//DTD BEAN//EN"
        "http://www.springframework.org/dtd/spring-beans.dtd">
<beans>
    <bean id="queryProps" class="org.springframework.beans.factory.config.PropertiesFactoryBean">
    </bean>

    <bean id="SparkSQLDBExtractorMPP" class="com.kali.db.SparkSqlValueClassMPP">
        <constructor-arg value="com.microsoft.sqlserver.jdbc.SQLServerDriver" />
        <constructor-arg value="jdbc:sqlserver://localhost;user=admin;password=oracle;database=AdventureWorks2014" />
        <constructor-arg value="admin" />
        <constructor-arg value="oracle" />
        <constructor-arg value="(select top 100 CustomerID,StoreID,TerritoryID,AccountNumber,ModifiedDate from customer ) as customer" />
        <constructor-arg value="~" />
        <constructor-arg value="1" />
        <constructor-arg value="100" />
        <constructor-arg value="8" />
        <constructor-arg value="CustomerID" />
    </bean>
</beans>

项目现在在github中

https://github.com/kali786516/ScalaDB

【问题讨论】:

【参考方案1】:

输出数据是垂直保存的,因为“list.add(resultSet.getString(i))”只是将每一列线性插入到列表中。

如果希望在单行中输出“db row”,您应该构建一个列表,每个元素都是“db row”的字符串表示,而不是列。并且“db row”需要转换为格式正确的字符串。

【讨论】:

你能举个例子吗,我不知道列名是什么我不希望我的代码用列名硬编码,我想让这段代码可重用有没有其他方式? 嗨,Shawn,感谢您的帮助,我使用 sparksql 数据帧替换了我的代码,这比传统的结果集示例简单 100 倍,如果有人想使用它,我将在这里分享代码。

以上是关于从db中提取多列数据并使用spark写入文件?的主要内容,如果未能解决你的问题,请参考以下文章

为啥从 Spark 写入 Vertica DB 比从 Spark 写入 MySQL 需要更长的时间?

Python pyspark 将 DF 写入 .csv 并存储在本地 C 盘

将 Spark 数据帧写入 postgres db 时出错

如何从图像中提取 (x, y) 坐标并写入 CSV 文件?

VB.Net 向 SQLite-DB 写入大量数据

Spark写入postgres慢