在 PySpark 中将 Python Dict 转换为稀疏 RDD 或 DF
Posted
技术标签:
【中文标题】在 PySpark 中将 Python Dict 转换为稀疏 RDD 或 DF【英文标题】:Converting Python Dict to Sparse RDD or DF in PySpark 【发布时间】:2017-08-04 22:53:28 【问题描述】:我有一些在 Pandas 中原型化的代码,我正试图将其转换为 PySpark 代码。它使用urlparse
Python 库将通用 URI 解析为 Python 字典,将键转换为新列,然后将这些新列与原始数据连接起来。下面是一个简化的例子。在真实数据集中有 38 列,我关心的是保留所有列。
# create some sample data
df_ex = pd.DataFrame([[102,'text1',u'/some/website/page.json?ovpevu&colc=1452802104103&si=569800363b029b74&rev=v4.1.2-wp&jsl=161&ln=en&pc=men&dp=www.mysite.com&qfq=news/this-is-an-article&of=2&uf=1&pd=0&irt=0&md=0&ct=1&tct=0&abt=0<=792&cdn=1&lnlc=gb&tl=c=141,m=433,i=476,xm=1243,xp=1254&pi=2&&rb=0&gen=100&callback=_ate.track.hsr&mk=some,key,words,about,the,article&'],
[781,'text2',u'/libtrc/hearst-network/loader.js'],
[9001,'text3',u'/image/view/-/36996720/highRes/2/-/maxh/150/maxw/150/mypic.jpg'],
[121,'text4',u'/website/page2.json?ovpevu&colc=1452802104103&si=569800363b029b74&rev=v4.1.2-wp&qqd=1&pd=0&irt=0&md=0&zzct=1&tct=0&abt=0<=792&cdn=0&lnlc=gb&tl=c=414,m=32,i=41,xm=1000,xp=111&callback=_ate.track.hsr&mk=some,other,key,words,about,the,article&'],
[781,'text5',u'/libtrc/hearst-network/loader.js']],columns=['num','text','uri'])
# parse the URI to a dict using urlparse
df_ex['uri_dict'] = df_ex['uri'].apply(lambda x: dict(urlparse.parse_qsl(urlparse.urlsplit(x).query)))
# convert the parsed dict to a series
df_ex_uridict_series = df_ex['uri_dict'].apply(pd.Series)
# concatenate the parsed dict (now columns) back with original DF
df_final = pd.concat([df_ex, df_ex_uridict_series], axis=1).drop('uri_dict', axis=1)
导致看起来像这样的东西(裁剪):
结果很稀疏,但没关系。对于应用程序,我实际上更喜欢它是一种稀疏矩阵(尽管如果有一个好的替代方案,我可以确信,密集的方法)。这是我试图在 PySpark 中重新创建的结果。
到目前为止,我要做的是(在 PySpark 2.1.0 中)this(使用相同的数据)。
# urlparse library
import urlparse
# create the sample data as RDD
data = sc.parallelize([[102,'text1',u'/some/website/page.json?ovpevu&colc=1452802104103&si=569800363b029b74&rev=v4.1.2-wp&jsl=161&ln=en&pc=men&dp=www.mysite.com&qfq=news/this-is-an-article&of=2&uf=1&pd=0&irt=0&md=0&ct=1&tct=0&abt=0<=792&cdn=1&lnlc=gb&tl=c=141,m=433,i=476,xm=1243,xp=1254&pi=2&&rb=0&gen=100&callback=_ate.track.hsr&mk=some,key,words,about,the,article&'],[781,'text2',u'/libtrc/hearst-network/loader.js'],[9001,'text3',u'/image/view/-/36996720/highRes/2/-/maxh/150/maxw/150/mypic.jpg'],[121,'text4',u'/website/page2.json?ovpevu&colc=1452802104103&si=569800363b029b74&rev=v4.1.2-wp&qqd=1&pd=0&irt=0&md=0&zzct=1&tct=0&abt=0<=792&cdn=0&lnlc=gb&tl=c=414,m=32,i=41,xm=1000,xp=111&callback=_ate.track.hsr&mk=some,other,key,words,about,the,article&'],[781,'text5',u'/libtrc/hearst-network/loader.js']])
# simple map to parse the uri
uri_parsed = data.map(list).map(lambda x: [x[0],x[1],urlparse.parse_qs(urlparse.urlsplit(x[2]).query)])
这让我非常接近,在 RDD 的每个“行”中嵌套了一个 python dict。像这样:
In [187]: uri_parsed.take(3)
Out[187]:
[[102,
'text1',
u'abt': [u'0'],
u'callback': [u'_ate.track.hsr'],
u'cdn': [u'1'],
u'colc': [u'1452802104103'],
u'ct': [u'1'],
u'dp': [u'www.mysite.com'],
u'gen': [u'100'],
u'irt': [u'0'],
u'jsl': [u'161'],
u'ln': [u'en'],
u'lnlc': [u'gb'],
u'lt': [u'792'],
u'md': [u'0'],
u'mk': [u'some,key,words,about,the,article'],
u'of': [u'2'],
u'pc': [u'men'],
u'pd': [u'0'],
u'pi': [u'2'],
u'qfq': [u'news/this-is-an-article'],
u'rb': [u'0'],
u'rev': [u'v4.1.2-wp'],
u'si': [u'569800363b029b74'],
u'tct': [u'0'],
u'tl': [u'c=141,m=433,i=476,xm=1243,xp=1254'],
u'uf': [u'1']],
[781, 'text2', ],
[9001, 'text3', ]]
这些值包含列表,但这很好。它们可以保留为列表。
此时我想做的是从 dict 中解析出键/值对(就像在 Pandas 中一样)以从键中创建新列,然后放置值(或值列表,在这种情况下) 在 RDD 中。
我尝试过的一些事情:
走向成熟的 PySpark DF:编写了一个 UDF 并使用with_column
在 DF 中创建一个新列。这可行,但它给了我整个 dict 作为单个字符串(没有键和值在引号中)。我没有试图推进这一点并添加引号(认为有更好的方法)。
拆分原始DF:首先使用monotonically_increasing_id()
为每个DF行分配一个唯一ID,拆分出两列(新ID和URI),将拆分转换为RDD,然后进行解析。这将让我重新加入(使用 ID),但它没有帮助创建我想要的“稀疏矩阵”。
我还想到,这些技术(使用带有 Hive 数据存储的 Spark v2.1.0)可能不是表示此类数据的正确底层技术。也许无模式数据存储会更好。但是,我现在受限于使用 Spark 和 Hive 作为数据存储。
任何帮助将不胜感激!
【问题讨论】:
【参考方案1】:我最近正在研究一个类似的问题,即解析包含由“=”分隔的键值对的字符串,其中可能的键事先不知道。
我不确定这是否是最有效的解决方案,但我想出了一个解决方案,通过 rdd 运行几次以发现和处理任意标签。
首先,解析出标记行的 URL 和 num-text 对:
def urlparsefn(url):
return urlparse.parse_qs(urlparse.urlsplit(url).query)
# parse the uri to a dictionary
uri_parsed = data.map(lambda x: (x[0],x[1],urlparsefn(x[2])))
然后您可以通过提取每个 uri 字典的唯一键来提取所有不同的标签,然后使用 Python set
将它们聚合在一起,这样您就可以轻松删除重复项。
# We need to discover all the unique keys before we will know which columns our data frame will have
combOp = (lambda x, y: x.union(y))
possible_keys_set = uri_parsed.map(lambda x: set(x[2].keys())).aggregate(set(), combOp, combOp)
possible_keys = sorted(list(possible_keys_set)) # sets have no order, this will give us alphabetical order in the final dataframe
现在我们有了所有可能的唯一键,我们可以提取不同的数字和标记行的文本,然后确保每个字典都包含所有标签,对不存在的元素使用一些占位符文本在特定的 uri 字典中。然后,您可以使用 Python 中的关键字参数构建一个 rdd 行。
def attrmap(urirow, possible_keys):
# Extract the 3 parts of the uri tuple
num = urirow[0]
text = urirow[1]
uridict = urirow[2]
# Assign the known fields identifying the row
uridict['num'] = num
uridict['text'] = text
# Run through the possible keys, add a placeholder for any keys not present in the row
for key in possible_keys:
if key not in uridict:
uridict[key] = 'N/A' # Some place holder for values in the list of possible keys, but not in the current uri dictionary
else:
uridict[key] = uridict[key][0] # All the lists only have 1 item, so just extract the item
return uridict
# Create an rdd of Row type, using the dictionary as kwargs
uri_allkeys = uri_parsed.map(lambda x: Row(**attrmap(x, possible_keys)))
最后一件事是根据 num、text 和所有提取的可能列构建新数据框的架构。
# Create an item in the schema for the known fields, and each possible key
df_schema = StructType()
for possible_key in ['num','text']+possible_keys:
df_schema.add(possible_key, StringType(), True)
# Use the new schema and rdd of rows to create the dataframe
uri_parsed_df = spark.createDataFrame(uri_allkeys, df_schema)
这应该为数据框提供任意列。希望这会有所帮助!
【讨论】:
以上是关于在 PySpark 中将 Python Dict 转换为稀疏 RDD 或 DF的主要内容,如果未能解决你的问题,请参考以下文章