python+spark程序代码片段

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了python+spark程序代码片段相关的知识,希望对你有一定的参考价值。

处理如此的字符串:
time^B1493534543940^Aid^B02CD^Aasr^B叫爸爸^Anlp^B{"domain":"com.abc.system.chat","intent":"chat","slots":{"tts":"爸爸","asr":"叫爸爸"},"voice":"叫爸爸","confidence":1.0,"cloud":false,"posStart":0,"posEnd":0}^Adomain^Bcom.abc.chat^Aintent^Bchat

python版spark代码如下

from operator import add
import time


def getInfo(str, sep1, sep2):
    thedate = today
    sn = default
    if str is not None:
        fields = str.split(sep1)
        if len(fields) > 1:
            for field in fields:
                if field is not None:
                    kv = field.split(sep2)
                    if len(kv) == 2:
                        if kv[0] == time:
                            timestamp = int(kv[1]) / 1000
                            time_local = time.localtime(timestamp)
                            thedate = time.strftime("%Y-%m-%d", time_local)
                        if kv[0] == id:
                            sn = kv[1]
    if thedate is not None and sn is not None:
        res = thedate + "|" + sn
    return res

rdd1 = sc.textFile("/Users/zhangzhenghai/example.log")
rdd2 = rdd1.map(lambda x: (getInfo(x,\u0001,\u0002),1))
rdd3 = rdd2.reduceByKey(add)
rdd4 = rdd3.map(lambda x: (x[1],x[0]))
rdd5 = rdd4.sortByKey(False)
rdd6 = rdd5.map(lambda x:(x[1],x[0]))
rdd6.collect()

 

以上仅供学习参考

以上是关于python+spark程序代码片段的主要内容,如果未能解决你的问题,请参考以下文章

spark关于join后有重复列的问题(org.apache.spark.sql.AnalysisException: Reference '*' is ambiguous)(代码片段

我在哪里更改此 Python 代码片段以将临时文件保存在 tmp 文件夹中?

常用python日期日志获取内容循环的代码片段

Python 向 Postman 请求代码片段

python [代码片段]一些有趣的代码#sort

微信小程序代码片段