在 apache spark 中创建存储桶
Posted
技术标签:
【中文标题】在 apache spark 中创建存储桶【英文标题】:Creating buckets in apache spark 【发布时间】:2016-09-12 07:38:01 【问题描述】:我想将数据存储在 spark 中,以使相差 5 秒或更短的时间戳与相应的数据一起落入一个 5 秒的存储桶中。同样,下一组 5 秒存储桶包含剩余的日志。 (这样我就可以汇总存储桶中的数据)。我的日志:
1472120400.107 HTTP GEO er.aujf.csdh.jkhydf.eyrgt
1472120399.999 HTTP GEO er.asdhff.cdn.qyirg.sdgsg
1472120397.633 HTTP GEO er.abff.kagsf.weyfh.ajfg
1472120397.261 HTTP GEO er.laffg.ayhrff.agyfr.yawr
1472120394.328 HTTP GEO er.qfryf.aqwruf.oiuqwr.agsf
1472120393.737 HTTP GEO er.aysf.aouf.ujaf.casf
.
.
.
我仍然无法弄清楚如何在火花中做到这一点。
时间戳为1472120400.107,1472120399.999,1472120397.633,1472120397.261等的日志落入一个bucket,下一组落入下一个bucket,以此类推。
输出:
时间戳为 1472120400.107,1472120399.999,1472120397.633,1472120397.261 的所有日志行都将保存在内存中(一个桶),以便进一步处理,例如查找整个桶的计数。同样,下一个桶。
【问题讨论】:
你的预期输出是什么? “桶”是什么意思? 这不过是对日志进行分区。分区的日志组形成存储桶。 【参考方案1】:只需将时间戳除以您要创建的粒度即可。在 PairRDD 中将 bin 编号作为 key,其中 data 是输入,然后是 reduceByKey。
我将在Scala中编写代码示例,基本上将其转换为python是微不足道的我想说明这一点。
val l5 = List("1472120400.107 HTTP GEO er.aujf.csdh.jkhydf.eyrgt", "1472120399.999 HTTP GEO er.asdhff.cdn.qyirg.sdgsg")
val l5RDD = sc.parallelize(l5) //input as RDD
val l5tmp = l5RDD.map(item => item.split(" ")) //Split the sentence
val l5tmp2 = l5tmp.map(item => ((item(0).toDouble/3600000).toInt, List(item))) //Map the data to a bin (in the key) according to the wanted granularity
val collected = l5tmp2.reduceByKey(_ ++ _) //Collect the lists to create the bins of data
collected.collect().foreach(println) //Prints (408,List([Ljava.lang.String;@2c6aed22, [Ljava.lang.String;@e322ec9)) - means that both entries collected to a bin named 408
【讨论】:
这部分代码我没看懂:val collected = l5tmp2.reduceByKey(_ ++ _)
。 ++ 是什么?
@kaks 您需要定义如何将项目收集在一起,在这种情况下,我将项目保存在列表中,因此为了将列表添加在一起,我使用 ++。每个 List 将包含所有应该落入 bin 的项目
你能告诉我们如何在 python 中编写它吗?我很难过。
你能告诉我如何用python编写它吗?我试过 reduceByKey(add) 但显示 add 没有定义以上是关于在 apache spark 中创建存储桶的主要内容,如果未能解决你的问题,请参考以下文章
为啥 terraform 会在 s3 存储桶中创建一个空目录?
使用 CloudFormation 在 S3 存储桶中创建 Lambda 通知