如何将 csv 文件转换为镶木地板

Posted

技术标签:

【中文标题】如何将 csv 文件转换为镶木地板【英文标题】:How to convert a csv file to parquet 【发布时间】:2014-11-25 07:02:15 【问题描述】:

我是 BigData 的新手。我需要将 csv/txt 文件转换为 Parquet 格式。我搜索了很多,但找不到任何直接的方法。有什么方法可以实现吗?

【问题讨论】:

【参考方案1】:

我已经在an answer 上发布了关于如何使用 Apache Drill 来做到这一点。但是,如果您熟悉 Python,您现在可以使用 Pandas 和 PyArrow!

安装依赖项

使用pip

pip install pandas pyarrow

或使用conda:

conda install pandas pyarrow -c conda-forge

将 CSV 分块转换为 Parquet

# csv_to_parquet.py

import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq

csv_file = '/path/to/my.tsv'
parquet_file = '/path/to/my.parquet'
chunksize = 100_000

csv_stream = pd.read_csv(csv_file, sep='\t', chunksize=chunksize, low_memory=False)

for i, chunk in enumerate(csv_stream):
    print("Chunk", i)
    if i == 0:
        # Guess the schema of the CSV file from the first chunk
        parquet_schema = pa.Table.from_pandas(df=chunk).schema
        # Open a Parquet file for writing
        parquet_writer = pq.ParquetWriter(parquet_file, parquet_schema, compression='snappy')
    # Write CSV chunk to the parquet file
    table = pa.Table.from_pandas(chunk, schema=parquet_schema)
    parquet_writer.write_table(table)

parquet_writer.close()

我没有将此代码与 Apache Drill 版本进行基准测试,但根据我的经验,它非常快,每秒转换数万行(这当然取决于 CSV 文件!)。


编辑:

我们现在可以使用pyarrow.csv.read_csv 将 CSV 文件直接读取到 PyArrow 表中。这可能比使用 Pandas CSV 阅读器更快,尽管它可能不太灵活。

【讨论】:

为什么不那么灵活? (抱歉,我没有使用pyarrow 的经验,只是看到你的评论很好奇) @sphoenix 我主要指的是pd.read_csvpyarrow.csv.read_csv 方法接受的参数数量。举个具体的例子,pd.read_csvsep="..."的情况可以是正则表达式,而pyarrow.csv.read_csvdelimiter="..."必须是单个字符。【参考方案2】:

[对于 Python]

Pandas 现在直接支持它。

只需使用 read_csv 将 csv 文件读入 pandas 的数据帧,然后使用 to_parquet 将该数据帧写入 parquet 文件即可。

【讨论】:

为什么要为 Java 问题提供 Python 解决方案? 因为已经有一个没有提到 to_parquet(因为它是在 0.21.0 中发布的)。认为这可能对需要基于 python 的解决方案的人有用。【参考方案3】:

您可以使用Apache Drill,如Convert a CSV File to Apache Parquet With Drill 中所述。

简而言之:

开始 Apache Drill:

$ cd /opt/drill/bin $ sqlline -u jdbc:drill:zk=local

创建 Parquet 文件:

-- 设置默认表格格式为 parquet ALTER SESSION SET `store.format`='parquet'; -- 创建一个 parquet 表,其中包含 CSV 表中的所有数据 创建表 dfs.tmp.`/stats/airport_data/` AS 选择 CAST(SUBSTR(columns[0],1,4) AS INT) `YEAR`, CAST(SUBSTR(columns[0],5,2) AS INT) `MONTH`, 列 [1] 作为“航空公司”, 列 [2] 作为“IATA_CODE”, 列[3] 为“AIRLINE_2”, 列 [4] 为“IATA_CODE_2”, 列 [5] 作为 `GEO_SUMMARY`, 列 [6] 为“GEO_REGION”, 列[7] 为“ACTIVITY_CODE”, 列 [8] 为“PRICE_CODE”, 列 [9] 作为“终端”, 列 [10] 为“BOARDING_AREA”, CAST(columns[11] AS DOUBLE) 作为`PASSENGER_COUNT` FROM dfs.`/opendata/Passenger/SFO_Passenger_Data/*.csv`;

尝试从新 Parquet 文件中选择数据:

-- 从 parquet 表中选择数据 选择 * FROM dfs.tmp.`/stats/airport_data/*`

您可以通过转到http://localhost:8047/storage/dfs(来源:CSV and Parquet)来更改dfs.tmp 的位置。

【讨论】:

我确认这是实现这一目标的最佳和最简单的方法。 Apache Hive 也可以作为替代方案。【参考方案4】:

以下代码是使用 spark2.0 的示例。读取比 inferSchema 选项快得多。 Spark 2.0 转换成 parquet 文件的效率比 spark1.6 高得多。

import org.apache.spark.sql.types._
var df = StructType(Array(StructField("timestamp", StringType, true),StructField("site", StringType, true),StructField("requests", LongType, true) ))
df = spark.read
          .schema(df)
          .option("header", "true")
          .option("delimiter", "\t")
          .csv("/user/hduser/wikipedia/pageviews-by-second-tsv")
df.write.parquet("/user/hduser/wikipedia/pageviews-by-second-parquet")

【讨论】:

【参考方案5】:

1) 可以创建外部 hive 表

create  external table emp(name string,job_title string,department string,salary_per_year int)
row format delimited
fields terminated by ','
location '.. hdfs location of csv file '

2) 另一个存储 parquet 文件的 hive 表

create  external table emp_par(name string,job_title string,department string,salary_per_year int)
row format delimited
stored as PARQUET
location 'hdfs location were you want the save parquet file'

将表一数据插入表二:

insert overwrite table emp_par select * from emp 

【讨论】:

表 emp_par 已创建为外部表。这应该已创建为普通表,否则您无法将数据插入其中。【参考方案6】:

将 csv 文件读取为 Dataframe in Apache Spark 和 spark-csv package。将数据加载到 Dataframe 后,将数据帧保存到 parquetfile。

val df = sqlContext.read
      .format("com.databricks.spark.csv")
      .option("header", "true")
      .option("inferSchema", "true")
      .option("mode", "DROPMALFORMED")
      .load("/home/myuser/data/log/*.csv")
df.saveAsParquetFile("/home/myuser/data.parquet")

【讨论】:

【参考方案7】:
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import *
import sys

sc = SparkContext(appName="CSV2Parquet")
sqlContext = SQLContext(sc)

schema = StructType([
    StructField("col1", StringType(), True),
    StructField("col2", StringType(), True),
    StructField("col3", StringType(), True),
    StructField("col4", StringType(), True),
    StructField("col5", StringType(), True)])
rdd = sc.textFile('/input.csv').map(lambda line: line.split(","))
df = sqlContext.createDataFrame(rdd, schema)
df.write.parquet('/output.parquet')

【讨论】:

【参考方案8】:

您可以使用 https://github.com/fraugster/parquet-go 项目中的 csv2parquet 工具。它比 Apache Drill 使用简单得多

【讨论】:

【参考方案9】:

我制作了一个小型命令行工具来将 CSV 转换为 Parquet:https://github.com/domoritz/csv2parquet。

【讨论】:

以上是关于如何将 csv 文件转换为镶木地板的主要内容,如果未能解决你的问题,请参考以下文章

转换为镶木地板的 csv 文件将“e0”添加到值的末尾

Hive/Bigsql Pandas 将浮点数转换为整数,使用 pyarrow 将空值转换为镶木地板文件

将 avro 转换为镶木地板(也许使用 hive?)

如果 csv 列标题包含空格,则在 spark 中将 csv 转换为 parquet 会出错

Spark SQL Java GenericRowWithSchema无法强制转换为java.lang.String

将 Parquet 转换为 CSV