将用户定义的对象转换为数据框并写入 RDBMS - 如何维护与数据库的映射?
Posted
技术标签:
【中文标题】将用户定义的对象转换为数据框并写入 RDBMS - 如何维护与数据库的映射?【英文标题】:Convert a user defined object into dataframe and write into RDBMS - how to maintain mapping with database? 【发布时间】:2016-12-21 19:42:12 【问题描述】:我在mysql中有如下表结构:
创建表用户( id INT NOT NULL, 名称 VARCHAR(20) 非空, 年龄 INT NOT NULL, 地址 VARCHAR(100) NOT NULL);
现在,我想编写一个 spark 流式作业,从 Kafka 读取数据,进行一些处理和过滤,然后写入“用户”表中的 RDBMS。
为此,我首先创建了表的 POJO 表示 -
@Data
class User implements Serializable
private int id;
private String name;
private int age;
private String address;
下面,我写了将rdd转换为数据帧的spark作业-
JavaDStream<User> userStream = ... // created this stream with some processing
userStream.foreachRDD(rdd ->
DataFrame df = sqlContext.createDataFrame(rdd,User.class);
df.write().mode(SaveMode.Append).jdbc(MYSQL_CONNECTION_URL, "user", new java.util.Properties());
);
现在,一旦我执行了这段代码,因为数据框是以偶然的方式形成的,并且没有与数据库模式同步。因此,它尝试在 'id' 列中插入 'address' 并以 sql 异常退出。
我无法理解如何让数据框理解数据库的架构并相应地从用户对象加载数据。有没有办法做到这一点?我认为 JavaRDD 可以映射到 JavaRDD,但是我不明白该怎么做。
另外,我相信这个 createDataFrame() API 处理使用反射(必须),因此也存在性能影响的问题。请问有没有办法维护POJO和关系数据库的映射关系,插入数据?
【问题讨论】:
【参考方案1】:这样做对我有用。
@Data
class User implements Serializable
private int id;
private String name;
private int age;
private String address;
private static StructType structType = DataTypes.createStructType(new StructField[]
DataTypes.createStructField("id", DataTypes.IntegerType, false),
DataTypes.createStructField("name", DataTypes.StringType, false),
DataTypes.createStructField("age", DataTypes.IntegerType, false),
DataTypes.createStructField("address", DataTypes.StringType, false)
);
public static StructType getStructType()
return structType;
public Object[] getAllValues()
return new Object[]id, name, age, address;
火花工作 -
JavaDStream<User> userStream = ... // created this stream with some processing
userStream.map(e ->
Row row = RowFactory.create(e.getAllValues());
return row;
).foreachRDD(rdd ->
DataFrame df = sqlContext.createDataFrame(rdd,User.getStructType());
df.write().mode(SaveMode.Append).jdbc(MYSQL_CONNECTION_URL, "user", new java.util.Properties());
);
我认为这比前一种方法更好,因为在前一种方法中,dataframe 使用反射将 POJO 映射到它自己的数据结构中。这是一种更简洁的方法,因为我已经 Row 是 spark sql 本身的一种格式,并且我已经在 getAllValues() 和getStructType()
中的列映射如果我错了,请纠正我。
【讨论】:
以上是关于将用户定义的对象转换为数据框并写入 RDBMS - 如何维护与数据库的映射?的主要内容,如果未能解决你的问题,请参考以下文章