从多个分区读取多个镶木地板文件
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了从多个分区读取多个镶木地板文件相关的知识,希望对你有一定的参考价值。
我试图通过pyspark从多个分区读取多个镶木地板文件,并将它们连接到一个大数据框架。文件看起来像,
hdfs dfs -ls /data/customers/odysseyconsultants/logs_ch_blade_fwvpn
Found 180 items
drwxrwxrwx - impala impala 0 2018-03-01 10:31 /data/customers/odysseyconsultants/logs_ch_blade_fwvpn/_impala_insert_staging
drwxr-xr-x - impala impala 0 2017-08-23 17:55 /data/customers/odysseyconsultants/logs_ch_blade_fwvpn/cdateint=20170822
drwxr-xr-x - impala impala 0 2017-08-24 05:57 /data/customers/odysseyconsultants/logs_ch_blade_fwvpn/cdateint=20170823
drwxr-xr-x - impala impala 0 2017-08-25 06:00 /data/customers/odysseyconsultants/logs_ch_blade_fwvpn/cdateint=20170824
drwxr-xr-x - impala impala 0 2017-08-26 06:04 /data/customers/odysseyconsultants/logs_ch_blade_fwvpn/cdateint=20170825
每个分区都有一个或多个镶木地板文件,即
hdfs dfs -ls /data/customers/odysseyconsultants/logs_ch_blade_fwvpn/cdateint=20170822
Found 1 items
-rw-r--r-- 2 impala impala 72252308 2017-08-23 17:55 /data/customers/odysseyconsultants/logs_ch_blade_fwvpn/cdateint=20170822/5b4bb1c5214fdffd-cc8dbcf600000008_1393229110_data.0.parq
我想要创建的是一个通用函数,它将采用from - to
参数并加载并连接大数据框中该时间范围的所有镶木地板文件。
我可以创建要读取的文件,
def read_files(table,from1,to):
s1 = ', '.join('/data/customers/odysseyconsultants/' + table + '/' + 'cdateint=' + str(i) for i in range(from1, to+1))
return s1.split(', ')
如果我尝试读取文件,如下所示,我得到一个例外
for i in read_files('logs_ch_blade_fwvpn', 20170506, 20170510):
... sqlContext.read.parquet(i).show()
如果我试着读它
x = read_files('logs_cs_blade_fwvpn', 20180109, 20180110)
d1 = sqlContext.read.parquet(*x)
我收到错误
pyspark.sql.utils.AnalysisException:u'Path不存在:hdfs:// nameservice1 / data / customers / odysseyconsultants / logs_cs_blade_fwvpn / cdateint = 20180109;'
答案
将目录名称用作分区怎么样?例如:
table = 'logs_ch_blade_fwvpn'
sqlContext.read.parquet('/data/customers/odysseyconsultants/' + table) \
.where(col('cdateint').between('20170822', '20170825')).show()
另一答案
这是一种做法,尽管我对替代方案持开放态度
import subprocess
from datetime import date, timedelta
from pyspark.sql import SQLContext
def read_data(customer, table, start_date, end_date):
def run_cmd(args_list):
#Run linux commands
print('Running system command: {0}'.format(' '.join(args_list)))
proc = subprocess.Popen(args_list, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
s_output, s_error = proc.communicate()
s_return = proc.returncode
return s_return, s_output, s_error
#Generate a list with the dates to access the parquet files
d1 = date(int(start_date[0:4]), int(start_date[4:6]), int(start_date[6:8]))
d2 = date(int(end_date[0:4]), int(end_date[4:6]), int(end_date[6:8]))
dates = [d1 + timedelta(days=x) for x in range((d2-d1).days + 1)]
#Loop through the dates and load the parquet files
files = []
for i in dates:
path = '/data/customers/' + customer + '/' + table + '/cdateint=' + str(i).replace('-','')
(ret, out, err) = run_cmd(['hdfs','dfs','-find',path,'-name','*.parq'])
files.append(out.split('\n'))
c=0
for i in files:
print(c)
for j in i:
print j
if c == 0:
if len(j) > 0:
df = sqlContext.read.parquet(j)
else:
if len(j) > 0:
df_temp = sqlContext.read.parquet(j)
df = df.union(df_temp)
del(df_temp)
c += 1
return df
以上是关于从多个分区读取多个镶木地板文件的主要内容,如果未能解决你的问题,请参考以下文章
在 Azure Databricks 中的日期范围之间读取镶木地板文件的有效方法
在python中使用s3 select解析多个镶木地板文件?
读取镶木地板文件时,有没有办法在 basePath 中使用通配符?