用开始和结束分隔符分隔多行记录

Posted

技术标签:

【中文标题】用开始和结束分隔符分隔多行记录【英文标题】:Separate multi line record with start and end delimiter 【发布时间】:2017-11-21 06:33:12 【问题描述】:

我有一个这样的文件(我提供给你示例数据,但是文件很大):

QQ
1
2
3
ZZ
b
QQ
4
5
6
ZZ
a
QQ
9
8
23

我想在 QQ 和 ZZ 之间读取数据,所以我希望数据框应该是这样的:

[1,2,3]
[4,5,6]
[9,8]

我尝试过的代码如下,但是对于大数据来说这是失败的。

from pyspark.sql.types import *
from pyspark import SparkContext
from pyspark.sql import SQLContext

path ="/tmp/Poonam.Raskar/Sample.txt"
sc =SparkContext()
sqlContext = SQLContext(sc)
sc.setLogLevel("ERROR")
textFile = sc.textFile(path)

wi = textFile.zipWithIndex()
startPos = wi.filter(lambda x: x[0].startswith('QQ')).map(lambda (key,index) : index).collect()
endPos = wi.filter(lambda x: x[0].startswith('ZZ')).map(lambda (key,index) : index).collect()
finalPos =zip(startPos,endPos)
dtlRow =[]

for pos in finalPos:
        #print(pos)
        #print(wi.filter())
        dtlRow1 = [[wi.filter(lambda x: x[1]==1).map(lambda (key,index) : key ,).collect() for i in range(pos[0],pos[1])]]  #Required option for collect...program is taking long time while executing this statement
        #print(dtlRow1)
        dtlRow.append(dtlRow1)


cSchema = StructType([StructField("DataFromList", ArrayType(StringType()))])
df = sqlContext.createDataFrame(dtlRow,schema=cSchema)
print(df.show())

【问题讨论】:

您的意思是它适用于您提供的示例数据?你所说的“失败”是什么意思?代码在哪里,确切的错误是什么? 是的,代码适用于样本数据,但对于大数据,计算需要无限时间。 【参考方案1】:

我怀疑你的方法的大数据问题是你有一个收集 rdd 的中间步骤,它不会扩展。这是使用 rdd/dataframe 的一种方式:

# get a DF with a rownumber
lst=['QQ', '1', '2', '3', 'ZZ', 'b', 'QQ', '4', '5', '6', 'ZZ', 'a', 'QQ', '9', '8', '23']
df=sc.parallelize(lst).zipWithIndex()\
  .map(lambda (x,i): Row(**'col': x, 'rownum': i)).toDF()

# hack to count cumulative occurrences of QQ
winspec=Window.partitionBy().orderBy('rownum')
df=df.withColumn('QQ_indicator', f.expr("case when col='QQ' then 1 else 0 end"))
df=df.withColumn('QQ_indicator_cum', f.sum('QQ_indicator').over(winspec))

# ditto for ZZ
df=df.withColumn('ZZ_indicator', f.expr("case when col='ZZ' then 1 else 0 end"))
df=df.withColumn('ZZ_indicator_cum', f.sum('ZZ_indicator').over(winspec))

df.filter("QQ_indicator_cum=ZZ_indicator_cum+1 and not(col='QQ')")\
  .groupby('QQ_indicator_cum')\
  .agg(f.collect_list('col').alias('result'))\
  .select('result')\
  .show(3)

【讨论】:

@***.com/users/8671053/ags29: 是否可以添加 col.startwith 代替 ("case when col='QQ' then 1 else 0 end") 。因为在我的文件数据中以 QQ123、QQ456、QQ789 开头

以上是关于用开始和结束分隔符分隔多行记录的主要内容,如果未能解决你的问题,请参考以下文章

C语言中多行注释的用法

C语言中多行注释的用法

获取两个不同的开始和结束分隔符之间的所有子字符串

读取由空格分隔的值的多行文本

在 Hive 中使用分隔符连接多行

回顾之前知识: 注释