python+spark程序代码片段
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了python+spark程序代码片段相关的知识,希望对你有一定的参考价值。
处理如此的字符串: time^B1493534543940^Aid^B02CD^Aasr^B叫爸爸^Anlp^B{"domain":"com.abc.system.chat","intent":"chat","slots":{"tts":"爸爸","asr":"叫爸爸"},"voice":"叫爸爸","confidence":1.0,"cloud":false,"posStart":0,"posEnd":0}^Adomain^Bcom.abc.chat^Aintent^Bchat python版spark代码如下 from operator import add import time def getInfo(str, sep1, sep2): thedate = ‘today‘ sn = ‘default‘ if str is not None: fields = str.split(sep1) if len(fields) > 1: for field in fields: if field is not None: kv = field.split(sep2) if len(kv) == 2: if kv[0] == ‘time‘: timestamp = int(kv[1]) / 1000 time_local = time.localtime(timestamp) thedate = time.strftime("%Y-%m-%d", time_local) if kv[0] == ‘id‘: sn = kv[1] if thedate is not None and sn is not None: res = thedate + "|" + sn return res rdd1 = sc.textFile("/Users/zhangzhenghai/example.log") rdd2 = rdd1.map(lambda x: (getInfo(x,‘\u0001‘,‘\u0002‘),1)) rdd3 = rdd2.reduceByKey(add) rdd4 = rdd3.map(lambda x: (x[1],x[0])) rdd5 = rdd4.sortByKey(False) rdd6 = rdd5.map(lambda x:(x[1],x[0])) rdd6.collect()
以上仅供学习参考
以上是关于python+spark程序代码片段的主要内容,如果未能解决你的问题,请参考以下文章
spark关于join后有重复列的问题(org.apache.spark.sql.AnalysisException: Reference '*' is ambiguous)(代码片段