将数据从 CSV 文件映射到 HDFS 上的 Hive 表时出错

Posted

技术标签:

【中文标题】将数据从 CSV 文件映射到 HDFS 上的 Hive 表时出错【英文标题】:Error while mapping the data from CSV file to a Hive table on HDFS 【发布时间】:2019-02-14 13:35:24 【问题描述】:

我正在尝试按照以下步骤将数据框加载到 Hive 表中:

    读取源表并将数据帧保存为 HDFS 上的 CSV 文件

    val yearDF = spark.read.format("jdbc").option("url", connectionUrl).option("dbtable", s"($execQuery) as year2016").option("user", devUserName).option("password", devPassword).option("partitionColumn","header_id").option("lowerBound", 199199).option("upperBound", 284058).option("numPartitions",10).load()
    

    按照我的 Hive 表列对列进行排序 我的配置单元表列以以下格式存在于字符串中:

    val hiveCols = col1:coldatatype|col2:coldatatype|col3:coldatatype|col4:coldatatype...col200:datatype
    val schemaList        = hiveCols.split("\\|")
    val hiveColumnOrder   = schemaList.map(e => e.split("\\:")).map(e => e(0)).toSeq
    val finalDF           = yearDF.selectExpr(hiveColumnOrder:_*)
    

    我在“execQuery”中读取的列的顺序与“hiveColumnOrder”相同,为了确保顺序,我再次使用 selectExpr 选择 yearDF 中的列

    将数据帧保存为 HDFS 上的 CSV 文件:

    newDF.write.format("CSV").save("hdfs://username/apps/hive/warehouse/database.db/lines_test_data56/")
    

    保存数据帧后,我会从“hiveCols”中获取相同的列, 准备一个 DDL 以在同一位置创建一个配置单元表,其中的值按给定的逗号分隔 下面:

如果不存在则创建表 schema.tablename(col1 coldatatype,col2 冷数据类型,col3冷数据类型,col4冷数据类型...col200数据类型) 排 以“,”结尾的格式分隔字段 存储为文本文件

位置 'hdfs://username/apps/hive/warehouse/database.db/lines_test_data56/';

在我将数据框加载到创建的表中后,我在这里面临的问题是当我查询表时,我在查询中得到不正确的输出。 例如:如果我在将其保存为文件之前对数据框应用以下查询:

finalDF.createOrReplaceTempView("tmpTable")
select header_id,line_num,debit_rate,debit_rate_text,credit_rate,credit_rate_text,activity_amount,activity_amount_text,exchange_rate,exchange_rate_text,amount_cr,amount_cr_text from tmpTable where header_id=19924598 and line_num=2

我得到了正确的输出。所有值都与列正确对齐:

[19924598,2,null,null,381761.40000000000000000000,381761.4,-381761.40000000000000000000,-381761.4,0.01489610000000000000,0.014896100000000,5686.76000000000000000000,5686.76]

但在将数据框保存在 CSV 文件中后,在其上创建一个表(步骤 4)并在创建的表上应用相同的查询,我看到数据混乱且与列映射不正确:

select header_id,line_num,debit_rate,debit_rate_text,credit_rate,credit_rate_text,activity_amount,activity_amount_text,exchange_rate,exchange_rate_text,amount_cr,amount_cr_text from schema.tablename where header_id=19924598 and line_num=2

+---------------+--------------+-------------+------------------+-------------+------------------+--------------------------+-------------------------------+------------------------+-----------------------------+--------------------+-------------------------+--+
| header_id     | line_num     | debit_rate  | debit_rate_text  | credit_rate  | credit_rate_text  | activity_amount  | activity_amount_text  | exchange_rate  | exchange_rate_text  | amount_cr  | amount_cr_text  |
+---------------+--------------+-------------+------------------+-------------+------------------+--------------------------+-------------------------------+------------------------+-----------------------------+--------------------+-------------------------+--+
| 19924598      | 2            | NULL        |                  | 381761.4    |                    | 5686.76          | 5686.76               | NULL           | -5686.76            | NULL       |                 |

所以我尝试使用不同的方法,预先创建配置单元表并将数据从数据框中插入到其中:

在上述第 4 步中运行 DDL finalDF.createOrReplaceTempView("tmpTable") spark.sql("insert into schema.table select * from tmpTable")

如果我在作业完成后运行上述选择查询,即使这种方式也会失败。 我尝试使用 refresh table schema.tablemsckrepair table schema.table 刷新表格,只是为了查看元数据是否有任何问题,但似乎没有任何问题。

谁能告诉我是什么导致了这种现象,我这里操作数据的方式有什么问题吗?

【问题讨论】:

总之,您有一些 CSV 文件,您想从它们创建 Hive 表并使用诸如 beeline 之类的 JDBC 工具对其运行查询? 是的。我确实使用直线创建了表格。有什么问题吗? 【参考方案1】:

使用 Spark 2.3.2

测试代码

您可以轻松地运行 SQL 命令并从 CSV 文件创建 Hive 表,而不是从 CSV 文件创建 Spark 数据帧然后将其注册为 Hive 表

val conf = new SparkConf
    conf
      .set("hive.server2.thrift.port", "10000")
      .set("spark.sql.hive.thriftServer.singleSession", "true")
      .set("spark.sql.warehouse.dir", "hdfs://PATH_FOR_HIVE_METADATA")
      .set("spark.sql.catalogImplementation","hive")
      .setMaster("local[*]")
      .setAppName("ThriftServer")

val spark = SparkSession.builder()
      .config(conf)
      .enableHiveSupport()
      .getOrCreate()

现在使用spark 对象,您可以以 Hive 用户身份运行 SQL 命令:

spark.sql("DROP DATABASE IF EXISTS my_db CASCADE")
spark.sql("create database if not exists my_db")
spark.sql("use my_db")

使用以下代码,您可以将所有 csv_files 加载到 HDFS 目录中(或者您可以只给出一个 CSV 文件的路径):

spark.sql(
      "CREATE TABLE test_table(" +
        "id int," +
        "time_stamp bigint," +
        "user_name string) " +
        "ROW FORMAT DELIMITED " +
        "FIELDS TERMINATED BY ',' " +
        "STORED AS TEXTFILE " +
        "LOCATION 'hdfs://PATH_TO_CSV_Directory_OR_CSV_FILE' "
    )

最后将 Spark sqlContext 对象注册为 Hive ThriftServer:

HiveThriftServer2.startWithContext(spark.sqlContext)

这将在端口 10000 上创建一个 ThriftServer 端点。

INFO ThriftCLIService: Starting ThriftBinaryCLIService on port 10000 with 5...500 worker threads

现在您可以运行 beeline 并连接到 ThriftServer:

beeline> !connect jdbc:hive2://localhost:10000
Connecting to jdbc:hive2://localhost:10000
Enter username for jdbc:hive2://localhost:10000: enter optional_username
Enter password for jdbc:hive2://localhost:10000: leave blank
Connected to: Spark SQL (version 2.3.2)
Driver: Hive JDBC (version 1.2.1.spark2)
Transaction isolation: TRANSACTION_REPEATABLE_READ
0: jdbc:hive2://localhost:10000>

并测试test_table表是否创建在my_db数据库下:

0: jdbc:hive2://localhost:10000> use my_db;
0: jdbc:hive2://localhost:10000> show tables ;
+-----------+-----------------------+--------------+--+
| database  |       tableName       | isTemporary  |
+-----------+-----------------------+--------------+--+
| my_db     | test_table            | false        |
+-----------+-----------------------+--------------+--+

此外,您可以使用 ThrifServer JDBC 端点创建任何其他 Hive 表(或任何 HiveQL 命令)。

以下是所需的依赖项:

 libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % sparkVersion,
  "org.apache.spark" %% "spark-sql" % sparkVersion,
  "org.apache.spark" %% "spark-hive" % sparkVersion,
  "org.apache.spark" %% "spark-hive-thriftserver" % sparkVersion,
  "org.apache.hadoop" % "hadoop-hdfs" % "2.8.3",
  "org.apache.hadoop" % "hadoop-common" % "2.8.3",
)

【讨论】:

Using Row format serde: org.apache.hadoop.hive.serde2.OpenCSVSerde 解决了我的问题。【参考方案2】:

我在 Hive DDL 中使用了行格式 serde:org.apache.hadoop.hive.serde2.OpenCSVSerde。这也有 ',' 作为默认分隔符,我不必提供任何其他分隔符。

【讨论】:

以上是关于将数据从 CSV 文件映射到 HDFS 上的 Hive 表时出错的主要内容,如果未能解决你的问题,请参考以下文章

将 csv 日志文件从 windows 服务器转储到 ubuntu VirtualBox/hadoop/hdfs

使用 .csv 格式的 HDFS 文件创建 Pandas DataFrame

使用flume将csv文件传输到hdfs,并将它们转换为avro

用于将文件从本地文件系统移动到 HDFS 的 Hadoop 工具 [关闭]

Hive:将hdfs中的gziped CSV作为只读加载到表中

将数据/文件从 Windows 复制到 Linux 机器或 HDFS