spark dataframe 字段可以有几种数据类型

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了spark dataframe 字段可以有几种数据类型相关的知识,希望对你有一定的参考价值。

参考技术A import scala.collection.mutable.ArrayBuffer
import scala.io.Source
import java.io.PrintWriter
import util.control.Breaks._
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
import java.sql.DriverManager
import java.sql.PreparedStatement
import java.sql.Connection
import org.apache.spark.sql.types.IntegerType
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.Row
import java.util.Properties
import org.apache.spark.sql.SaveMode

object SimpleDemo extends App
val sc = new SparkContext("local[*]", "test")
val sqlc = new SQLContext(sc)
val driverUrl = "jdbc:mysql://ip:3306/ding?user=root&password=root&zeroDateTimeBehavior=convertToNull&characterEncoding=utf-8"
val tableName = "tbaclusterresult"

//把数据转化为DataFrame,并注册为一个表
val df = sqlc.read.json("G:/data/json.txt")
df.registerTempTable("user")
val res = sqlc.sql("select * from user")
println(res.count() + "---------------------------")
res.collect().map row =>

println(row.toString())



//从MYSQL读取数据
val jdbcDF = sqlc.read
.options(Map("url" -> driverUrl,
// "user" -> "root",
// "password" -> "root",
"dbtable" -> tableName))
.format("jdbc")
.load()
println(jdbcDF.count() + "---------------------------")
jdbcDF.collect().map row =>

println(row.toString())



//插入数据至MYSQL
val schema = StructType(
StructField("name", StringType) ::
StructField("age", IntegerType)
:: Nil)

val data1 = sc.parallelize(List(("blog1", 301), ("iteblog", 29),
("com", 40), ("bt", 33), ("www", 23))).
map(item => Row.apply(item._1, item._2))
import sqlc.implicits._
val df1 = sqlc.createDataFrame(data1, schema)
// df1.write.jdbc(driverUrl, "sparktomysql", new Properties)
df1.write.mode(SaveMode.Overwrite).jdbc(driverUrl, "testtable", new Properties)

//DataFrame类中还有insertIntoJDBC方法,调用该函数必须保证表事先存在,它只用于插入数据,函数原型如下:
//def insertIntoJDBC(url: String, table: String, overwrite: Boolean): Unit

//插入数据到MYSQL
val data = sc.parallelize(List(("www", 10), ("iteblog", 20), ("com", 30)))
data.foreachPartition(myFun)

case class Blog(name: String, count: Int)

def myFun(iterator: Iterator[(String, Int)]): Unit =
var conn: Connection = null
var ps: PreparedStatement = null
val sql = "insert into blog(name, count) values (?, ?)"
try
conn = DriverManager.getConnection(driverUrl, "root", "root")
iterator.foreach(data =>
ps = conn.prepareStatement(sql)
ps.setString(1, data._1)
ps.setInt(2, data._2)
ps.executeUpdate()
)
catch
case e: Exception => e.printStackTrace()
finally
if (ps != null)
ps.close()

if (conn != null)
conn.close()



本回答被提问者采纳

Spark SQL DataFrame - 异常处理

在我们的应用程序中,我们的大多数代码只是在filter上应用group byaggregateDataFrame操作,并将DF保存到Cassandra数据库。

与下面的代码一样,我们有几种方法可以在不同数量的字段上执行相同类型的操作[filter, group by, join, agg]并返回DF并将其保存到Cassandra表中。

示例代码是:

 val filteredDF = df.filter(col("hour") <= LocalDataTime.now().getHour())
.groupBy("country")
.agg(sum(col("volume")) as "pmtVolume")

saveToCassandra(df)

def saveToCassandra(df: DataFrame) {
    try {
        df.write.format("org.apache.spark.sql.cassandra")
        .options(Map("Table" -> "tableName", "keyspace" -> keyspace)
        .mode("append").save()
    }
    catch {
        case e: Throwable => log.error(e)
    }
}

由于我通过将DF保存到Cassandra来调用该操作,我希望我只需要根据this线程处理该行的异常。

如果我得到任何异常,我可以默认在Spark详细日志中看到异常。

我是否必须真正围绕过滤器,使用Trytry , catch?进行分组

我没有看到有关异常处理的Spark SQL DataFrame API示例的任何示例。

我如何在Try方法上使用saveToCassandra?它返回Unit

答案

你真的不需要用filtergroup byTry包围trycatch代码。由于所有这些操作都是转换,因此在对它们执行操作之前它们不会执行,例如saveToCassandra

但是,如果在过滤,分组或聚合数据帧时发生错误,saveToCassandra函数中的catch子句将记录它,因为正在执行操作。

另一答案

在try catch中包含懒惰的DAG是没有意义的。 您需要在Try()中包装lambda函数。 不幸的是,AFAIK无法在DataFrames中进行行级异常处理。

您可以使用RDD或DataSet,如spache spark exception handling下面这篇文章的回答中所述

以上是关于spark dataframe 字段可以有几种数据类型的主要内容,如果未能解决你的问题,请参考以下文章

spark知识体系04-SQL,DataFrames,DateSets

Spark SQL, DataFrames and Datasets 指南

动态和可配置地更改几种 Spark DataFrame 列类型

[Spark2.0]Spark SQL, DataFrames 和Datasets指南

如何使用新列对 Spark dataFrame 中的字符串字段进行 JSON 转义

spark DataFrame的创建几种方式和存储