火花笛卡尔积

Posted

技术标签:

【中文标题】火花笛卡尔积【英文标题】:Spark cartesian product 【发布时间】:2016-08-08 11:31:48 【问题描述】:

我必须比较坐标才能得到距离。因此,我使用 sc.textFile() 加载数据并制作笛卡尔积。文本文件中有大约 2.000.000 行,因此需要比较 2.000.000 x 2.000.000 的坐标。

我用大约 2000 个坐标测试了代码,它在几秒钟内就可以正常工作。但是使用大文件似乎停在某个点,我不知道为什么。代码如下:

def concat(x,y):
    if(isinstance(y, list)&(isinstance(x,list))):
        return x + y
    if(isinstance(x,list)&isinstance(y,tuple)):
        return x + [y]
    if(isinstance(x,tuple)&isinstance(y,list)):
        return [x] + y
    else: return [x,y]

def haversian_dist(tuple):
    lat1 = float(tuple[0][0])
    lat2 = float(tuple[1][0])
    lon1 = float(tuple[0][2])
    lon2 = float(tuple[1][2])
    p = 0.017453292519943295
    a = 0.5 - cos((lat2 - lat1) * p)/2 + cos(lat1 * p) * cos(lat2 * p) * (1 - cos((lon2 - lon1) * p)) / 2
    print(tuple[0][1])
    return (int(float(tuple[0][1])), (int(float(tuple[1][1])),12742 * asin(sqrt(a))))

def sort_val(tuple):
    dtype = [("globalid", int),("distance",float)]
    a = np.array(tuple[1], dtype=dtype)
    sorted_mins = np.sort(a, order="distance",kind="mergesort")
    return (tuple[0], sorted_mins)


def calc_matrix(sc, path, rangeval, savepath, name):
    data = sc.textFile(path)
    data = data.map(lambda x: x.split(";"))
    data = data.repartition(100).cache()
    data.collect()
    matrix = data.cartesian(data)
    values = matrix.map(haversian_dist)
    values = values.reduceByKey(concat)
    values = values.map(sort_val)
    values = values.map(lambda x: (x[0], x[1][1:int(rangeval)].tolist()))
    values = values.map(lambda x: (x[0], [y[0] for y in x[1]]))
    dicti = values.collectAsMap()
    hp.save_pickle(dicti, savepath, name)

即使是包含大约 15.000 个条目的文件也不起作用。我知道笛卡尔导致 O(n^2) 运行时。但火花不应该处理这个吗?还是有什么问题?唯一的出发点是一条错误消息,但我不知道它是否与实际问题有关:

16/08/06 22:21:12 WARN TaskSetManager: Lost task 15.0 in stage 1.0 (TID 16, hlb0004): java.net.SocketException: Daten?bergabe unterbrochen (broken pipe)
    at java.net.SocketOutputStream.socketWrite0(Native Method)
    at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:109)
    at java.net.SocketOutputStream.write(SocketOutputStream.java:153)
    at java.io.BufferedOutputStream.write(BufferedOutputStream.java:122)
    at java.io.DataOutputStream.write(DataOutputStream.java:107)
    at java.io.FilterOutputStream.write(FilterOutputStream.java:97)
    at org.apache.spark.api.python.PythonRDD$.org$apache$spark$api$python$PythonRDD$$write$1(PythonRDD.scala:440)
    at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:452)
    at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:452)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:452)
    at org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:280)
    at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1741)
    at org.apache.spark.api.python.PythonRunner$WriterThread.run(PythonRDD.scala:239)

16/08/06 22:21:12 INFO TaskSetManager: Starting task 15.1 in stage 1.0 (TID 17, hlb0004, partition 15,PROCESS_LOCAL, 2408 bytes)
16/08/06 22:21:12 WARN TaskSetManager: Lost task 7.0 in stage 1.0 (TID 8, hlb0004): java.net.SocketException: Connection reset
    at java.net.SocketInputStream.read(SocketInputStream.java:209)
    at java.net.SocketInputStream.read(SocketInputStream.java:141)
    at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
    at java.io.BufferedInputStream.read(BufferedInputStream.java:265)
    at java.io.DataInputStream.readInt(DataInputStream.java:387)
    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:139)
    at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207)
    at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125)
    at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
    at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:342)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
    at org.apache.spark.scheduler.Task.run(Task.scala:89)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

【问题讨论】:

你能举一个你的数据的例子吗?此外,如果dist(u, v) == dist(v, u)dist(u, u) == 0 或某个常数,那么您可以将计算次数减少到(n*(n-1))/2 对而不是n^2 对。 一行看起来像这样“94.5406036377;1313316.000000000000000;32.791301727300002;5”,是的,我可以用它来减少它,但我认为它甚至在这些计算之前就停止了。或者我可以在构建笛卡尔时实现这个吗? 您能指出您使用的半正弦距离公式并向我解释常量p 和12742 吗?距离的计算似乎有问题。 我从这里得到了公式:***.com/questions/27928/… 在使用 Samples 进行测试时效果很好 【参考方案1】:

您在代码中使用了data.collect(),它基本上将所有数据调用到一台机器中。根据该机器上的内存,2,000,000 行数据可能不太适合。

另外,我尝试通过连接而不是使用cartesian 来减少要完成的计算次数。 (请注意,我只是使用 numpy 生成了随机数,这里的格式可能与您的格式不同。不过,主要思想是相同的。)

import numpy as np
from numpy import arcsin, cos, sqrt

# suppose my data consists of latlong pairs
# we will use the indices for pairing up values
data = sc.parallelize(np.random.rand(10,2)).zipWithIndex()
data = data.map(lambda (val, idx): (idx, val))

# generate pairs (e.g. if i have 3 pairs with indices [0,1,2],
# I only have to compute for distances of pairs (0,1), (0,2) & (1,2)
idxs = range(data.count())
indices = sc.parallelize([(i,j) for i in idxs for j in idxs if i < j])

# haversian func (i took the liberty of editing some parts of it)
def haversian_dist(latlong1, latlong2):
    lat1, lon1 = latlong1
    lat2, lon2 = latlong2
    p = 0.017453292519943295
    def hav(theta): return (1 - cos(p * theta))/2
    a = hav(lat2 - lat1) + cos(p * lat1)*cos(p * lat2)*hav(lon2 - lon1)
    return 12742 * arcsin(sqrt(a))

joined1 = indices.join(data).map(lambda (i, (j, val)): (j, (i, val)))
joined2 = joined1.join(data).map(lambda (j, ((i, latlong1), latlong2)): ((i,j), (latlong1, latlong2))
haversianRDD = joined2.mapValues(lambda (x, y): haversian_dist(x, y))

【讨论】:

这似乎在某种程度上起作用,我现在收到错误“slurmstepd:错误:在某些时候超出了步骤内存限制。”感谢您的回复!我希望我能解决这个新错误^^

以上是关于火花笛卡尔积的主要内容,如果未能解决你的问题,请参考以下文章

什么是笛卡尔积?

笛卡尔积

数据库笛卡尔积

笛卡尔积请具体解释一下.

笛卡尔乘积的意义

笛卡尔积举例分析