在 spark 序列化中使用 maxmind geoip

Posted

技术标签:

【中文标题】在 spark 序列化中使用 maxmind geoip【英文标题】:using maxmind geoip in spark serialized 【发布时间】:2015-05-06 11:13:48 【问题描述】:

我正在尝试将 MaxMind GeoIP API 用于 scala-spark,它位于 https://github.com/snowplow/scala-maxmind-iplookups。我使用标准加载文件:

val ipLookups = IpLookups(geoFile = Some("GeoLiteCity.dat"), memCache = false, lruCache = 20000)

我有一个包含时间和 IP 地址的基本 csv 文件:

val sweek1 = week1.mapline=> IP(parse(line)).collect
  case Some(ip) => 
    val ipadress = ipdetect(ip.ip)
    (ip.time, ipadress)
    

函数ipdetect基本定义为:

def ipdetect(a:String)=
  ipLookups.performLookups(a)._1 match
    case Some(value) => value.toString
    case _ => "Unknown"
  

当我运行这个程序时,它提示“任务不可序列化”。所以我读了几篇文章,似乎有几种方法可以解决这个问题。

1, a wrapper 2、使用SparkContext.addFile(跨集群分发文件)

但我无法弄清楚其中任何一个是如何工作的,我尝试了包装器,但我不知道如何以及在哪里调用它。 我尝试了 addFile,但它返回一个单元而不是字符串,我假设您需要以某种方式管道二进制文件。所以我不确定现在该怎么做。非常感谢任何帮助

所以我已经能够通过使用 mapPartitions 对其进行某种程度的序列化并遍历每个本地分区,但我想知道是否有更有效的方法来做到这一点,因为我的数据集在数百万范围内

【问题讨论】:

【参考方案1】:

假设您的 csv 文件每行包含一个 IP 地址,例如,您希望将每个 IP 地址映射到一个城市。

import com.snowplowanalytics.maxmind.iplookups.IpLookups

val geoippath = "path/to/geoip.dat"
val sc = new SparkContext(new SparkConf().setAppName("IP Converter"))
sc.addFile(geoippath)

def parseIP(ip:String, ipLookups: IpLookups): String = 
  val lookupResult = ipLookups.performLookups(ip)
  val city = lookupResult._1.map(_.city).getOrElse(None).getOrElse("")


val logs = sc.textFile("path/to/your.csv")
             .mapWith(_ => IpLookups(geoFile = Some(SparkFiles.get("geoip.dat"))))(parseIP)

其他ip转换请参考Scala MaxMind IP Lookups。 此外,mapWith 似乎已被弃用。请改用mapPartitionsWithIndex

【讨论】:

很好的解决方案,我不知道如何在节点之间分配数据,所以我不得不通过在 master 中运行一个非常低效的循环来解决。顺便说一句,您如何编写 .dat 文件的完整路径?我试过“file:///home/hadoop/geoip.dat”它没有工作(但只是“geoip.dat”工作) 之前我也卡在addFile的路径格式中,但是我发现addFile似乎接受和sc.textFile一样的路径格式。因此,正如this guide 提到的,您可以尝试“/my/directory/geoip.dat”而不指定“file:///”。 @GameOfThrows 嗨,您能分享一下 maxmind geoip 的设置指南吗?我已经尝试了很多东西,但无法弄清楚。非常感谢!

以上是关于在 spark 序列化中使用 maxmind geoip的主要内容,如果未能解决你的问题,请参考以下文章

在序列化数据中找不到必填字段“uncompressed_pa​​ge_size”!镶木地板

在Spark中使用Kryo序列化

MaxMind 的 locid 和 GeoNames id 之间是不是有任何映射?

无法使用 spark kryo 序列化

Maxmind GeoLite2纯javascript / html示例

在 Spark Scala 中使用自定义数据框类时任务不可序列化