在 apache spark 中创建存储桶

Posted

技术标签:

【中文标题】在 apache spark 中创建存储桶【英文标题】:Creating buckets in apache spark 【发布时间】:2016-09-12 07:38:01 【问题描述】:

我想将数据存储在 spark 中,以使相差 5 秒或更短的时间戳与相应的数据一起落入一个 5 秒的存储桶中。同样,下一组 5 秒存储桶包含剩余的日志。 (这样我就可以汇总存储桶中的数据)。我的日志:

1472120400.107 HTTP GEO er.aujf.csdh.jkhydf.eyrgt
1472120399.999 HTTP GEO er.asdhff.cdn.qyirg.sdgsg
1472120397.633 HTTP GEO er.abff.kagsf.weyfh.ajfg
1472120397.261 HTTP GEO er.laffg.ayhrff.agyfr.yawr
1472120394.328 HTTP GEO er.qfryf.aqwruf.oiuqwr.agsf
1472120393.737 HTTP GEO er.aysf.aouf.ujaf.casf
.
.
.

我仍然无法弄清楚如何在火花中做到这一点。

时间戳为1472120400.107,1472120399.999,1472120397.633,1472120397.261等的日志落入一个bucket,下一组落入下一个bucket,以此类推。

输出:

时间戳为 1472120400.107,1472120399.999,1472120397.633,1472120397.261 的所有日志行都将保存在内存中(一个桶),以便进一步处理,例如查找整个桶的计数。同样,下一个桶。

【问题讨论】:

你的预期输出是什么? “桶”是什么意思? 这不过是对日志进行分区。分区的日志组形成存储桶。 【参考方案1】:

只需将时间戳除以您要创建的粒度即可。在 PairRDD 中将 bin 编号作为 key,其中 data 是输入,然后是 reduceByKey。

我将在Scala中编写代码示例,基本上将其转换为python是微不足道的我想说明这一点。

val l5 = List("1472120400.107 HTTP GEO er.aujf.csdh.jkhydf.eyrgt", "1472120399.999 HTTP GEO er.asdhff.cdn.qyirg.sdgsg") 
val l5RDD = sc.parallelize(l5) //input as RDD
val l5tmp = l5RDD.map(item => item.split(" ")) //Split the sentence
val l5tmp2 = l5tmp.map(item => ((item(0).toDouble/3600000).toInt, List(item))) //Map the data to a bin (in the key) according to the wanted granularity
val collected = l5tmp2.reduceByKey(_ ++ _) //Collect the lists to create the bins of data
collected.collect().foreach(println) //Prints (408,List([Ljava.lang.String;@2c6aed22, [Ljava.lang.String;@e322ec9)) - means that both entries collected to a bin named 408

【讨论】:

这部分代码我没看懂:val collected = l5tmp2.reduceByKey(_ ++ _)。 ++ 是什么? @kaks 您需要定义如何将项目收集在一起,在这种情况下,我将项目保存在列表中,因此为了将列表添加在一起,我使用 ++。每个 List 将包含所有应该落入 bin 的项目 你能告诉我们如何在 python 中编写它吗?我很难过。 你能告诉我如何用python编写它吗?我试过 reduceByKey(add) 但显示 add 没有定义

以上是关于在 apache spark 中创建存储桶的主要内容,如果未能解决你的问题,请参考以下文章

为啥 terraform 会在 s3 存储桶中创建一个空目录?

使用 CloudFormation 在 S3 存储桶中创建 Lambda 通知

为啥此脚本无法在 s3 中创建存储桶?

在 Apache Spark 中创建 DAG

如何在 gsutil 中创建一个存储桶,用于在 cloudsql 中上传 sql 转储文件。?

在 Terraform 中创建 S3 存储桶通知时出错