如何在不使用collect()和for循环的情况下将一个(IP地址)的特定部分与RDD python pyspark中另一列中的其他IP地址进行比较

Posted

技术标签:

【中文标题】如何在不使用collect()和for循环的情况下将一个(IP地址)的特定部分与RDD python pyspark中另一列中的其他IP地址进行比较【英文标题】:How to compare a specific part of one (Ip adress) with other ip adreess in another column in RDD python pyspark without using collect() and for loop 【发布时间】:2019-07-08 15:47:09 【问题描述】:

我有两个 IP 地址列表,它们位于单独的 txt 文件中。 我想通过取它们的前三个字节来比较这两个数据集。

例如:

a='123.43.54.231'
b='123.43.54.50'

由于前三个字节在 a 和 b 之间是相互的,所以我想获取完整的 a (123.43.54.231)。

因为我处理的是 RDD,所以考虑到它的数据集很大,应该尽可能避免使用 collect()。实际上,我写了一个正确的代码来做我想要的。但是,我所做的包含collect(),这导致该过程非常缓慢。

Python_3.7.3

from pyspark import SparkContext, SparkConf

  if __name__ == "__main__":
  conf = SparkConf().setAppName("Big_Data_Project").setMaster("local[*]")
  sc = SparkContext(conf = conf)
  
  Ip_1= sc.textFile("Ip_1.txt")

#Ip_1='''123.34.405.123 153.74.61.65 43.34.65.123 ...... '''
#Ip_2='''123.34.321.143 153.74.61.43 43.34.65.112 ...... '''

  Ip_2= sc.textFile("Ip_2.txt")

  y=[]
  def func():
      
      for i in Ip_1.collect():
          for x in Ip_2.collect():
              d=i[:i.rfind(".")]
              h=x[:x.rfind(".")]
              if d==h:
                  y.append(i)
              else:
                  pass
      return y
  Wanted_Ip=sc.parallelize(func())
  Wanted_Ip.repartition(1).saveAsTextFile("My Ip List")

正如我解释的那样,我想获得与 Ip_2 RDD 的前三个字节匹配的 Ip_1 的 full ip_adress

153.74.61.65
43.34.65.123

我正在寻找一个不包括collect()的解决方案。

【问题讨论】:

RDD 与 collect 或 toLocalIterator 等方法之间的关系在历史上是可以理解的,但在使用 RDD 时,必须尽可能避免使用 collect 方法并不是一个普遍的事实。这种断言一般来说是不合适的。问题在于大型数据集,因为数据是在驱动程序内存级别从收集到驱动程序端的所有分区带来的。如果驱动程序可以存储这些分区中最大的一个,则可以使用 toLocalIterator 代替 cf。 github.com/.../python/pyspark/rdd.py 存储库 【参考方案1】:

你只需要生成一个加入的密钥,然后执行加入:

gen_key = lambda x : (x.rsplit('.', 1)[0], x)
Ip_1 = Ip_1.map(gen_key)
Ip_2 = Ip_2.map(gen_key)
common_ip = Ip_1.join(Ip_2)

common_ip 是一个 rdd,其中每一行都是一个 (key,value) 对,其中:

key = 3 位 ip 值 = 来自 Ip_1 和 Ip_2 的一对 IP
common_ip.collect()                                                                                                                                                                                                                           

[('123.43.54', ('123.43.54.231', '123.43.54.50'))]

如果你只想要 Ip_1 的 IP,那么你可以这样做:

common_ip.map(lambda x : x[1][0]).repartition(1).saveAsTextFile("My Ip List")

【讨论】:

你知道为什么当我写: gen_key = lambda x : (x.rsplit('.', -3)[0], x) 它不占用前三个字节!我想把它们从左到右而不是从右到左。还有另一个信息包括右侧不同数量的点。 有 2 个函数:splitrsplit(反向拆分)。数字是你要拆分多少,而不是多少字符...所以,使用x.split('.')[0]

以上是关于如何在不使用collect()和for循环的情况下将一个(IP地址)的特定部分与RDD python pyspark中另一列中的其他IP地址进行比较的主要内容,如果未能解决你的问题,请参考以下文章

如何在不使用 for 循环的情况下填充二维数组?

如何在不使用 for 循环的情况下将列表中的所有项目与整数进行比较

如何在不使用 for 循环的情况下从 appsettings 文件中读取对象数组中特定键的值

在 Robot Framework 中找不到 IN 关键字。如何在不使用 IN 关键字的情况下使用 for 循环?

如何在不使用for循环的情况下合并需要提前3个月的列上的两个数据框

如何在不使用for循环的情况下从pyspark中的列表创建数据框?