正确使用 flatMap

Posted

技术标签:

【中文标题】正确使用 flatMap【英文标题】:Proper use of flatMap 【发布时间】:2017-02-22 15:40:42 【问题描述】:

为什么我每次尝试我的 RDD 操作时都会收到此错误以及如何修复它?

/databricks/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    317                 raise Py4JJavaError(
    318                     "An error occurred while calling 012.\n".
--> 319                     format(target_id, ".", name), value)
    320             else:
    321                 raise Py4JError(

我试图找出我可以对其执行操作的最后一个 RDD 及其 ratingByUser,这表明问题出在 flatMap 中。

我想要做的是我将 CSV 与 (userID,movieID,rating) 一起使用,并且我想为每个用户 ID 创建唯一的电影 ID 组合与评分,但不同的用户可以生成相同的电影 ID 对,例如对于这个 CSV:

1,2000,5

1,2001,2

1,2002,3

2,2000,4

2,2001,1

2,2004,5

我想要 RDD:

键(2000,2001),值(5,2,1)

键(2000,2002),值(5,3,1)

键(2001,2002),值(2,3,1)

键(2000,2001),值(4,1,1)

键(2000,2004),值(4,5,1)

键(2001,2004),值(1,5,1)

# First Map function - gets line and returns key(userID) value(movieID,rating)
def parseLine(line):
  fields=line.split(",")
  userID=int(fields[0])
  movieID=int(fields[1])
  rating=int(fields[2])
  return userID, (movieID,rating)
# Function to create movie unique pairs with ratings
# all pair start with the lowest ID
# returns key (movieIDj,movieIDi) & value (rating-j,rating-i,1)
# the 1 in value is added in order to count number of ratings in the reduce

def createPairs(userRatings):
  pairs=[]
  for i1 in range(len(userRatings[1])-1):
    for i2 in range(i1+1,len(userRatings[1])):
      if userRatings[i1][0]<userRatings[1][i2][0]:
        pairs.append(((userRatings[1][i1][0],userRatings[1][i2][0]),(userRatings[1][i1][1],userRatings[1][i2][1],1)))
      else:
        pairs.append(((userRatings[1][i2][0],userRatings[1][i1][0]),(userRatings[1][i2][1],userRatings[1][i1][1],1)))
  return pairs

# Create SC object from the ratings file
lines = sc.textFile("/FileStore/tables/dvmlbdnj1487603982330/ratings.csv")
# Map lines to Key(userID),Value(movieID,rating)
movieRatings = lines.map(parseLine)
# Join all rating by same user into one key
# (UserID1,(movie1,rating1)),(UserID1,(movie2,rating2)) --> UserID1,[(movie1,rating1),(movie2,rating2)]
ratingsPerUser = movieRatings.groupByKey()
# activate createPairs func
# We use flatMap, since each user have different number of ratings --> different number pairs
pairsOfMovies = ratingsPerUser.flatMap(createPairs)

【问题讨论】:

【参考方案1】:

问题是函数传递给flatMap 而不是flatMap。 按键分组返回迭代器:

不能多次遍历 无法编入索引。

先转换为列表:

ratingsPerUser.mapValues(list).flatMap(createPairs)

【讨论】:

以上是关于正确使用 flatMap的主要内容,如果未能解决你的问题,请参考以下文章

如何正确的使用SharedPreferences

如何正确使用 Composer 安装 Laravel 扩展包

如何正确使用 Composer 安装 Laravel 扩展包

如何正确使用 Composer 安装 Laravel 扩展包

如何正确强制正确使用类方法?

如何正确使用 Composer 安装 Laravel 扩展包