shape into blocks--source code in python based on pySpark
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了shape into blocks--source code in python based on pySpark相关的知识,希望对你有一定的参考价值。
这是微博深度和广度预测的原始代码,写了大约半个月,第一个版本不是这样的,但是这个版本包含所有需要的功能。
模块化的程度也更高。找工作前一直想用python完美解决这个问题,后来发现自己的方法和硬件都有很大的局限。
算是我的第一次正儿八经的尝试在分布式计算的框架下,计算海量的数据。
意识到很多问题,影响我面试时候很多的代码风格。
def get_basic_info(): win_path = "E:/spark/weibo_predict/" linux_path = "/home/jason/spark/weibo_predict/" path = linux_path train_path = path + ‘train/‘ test_path = path + ‘test/‘ code_path = path + ‘source_code/‘ print(‘\n训练准备文件保存路径px:%s‘ % train_path) print(‘\n测试准备文件保存路径py:%s‘ % test_path) print(‘\n代码准备文件保存路径pz:%s‘ % code_path) train_weibo_raw_path = path + "train_weibo_raw.txt" train_weibo_repost_path = path + "train_weibo_repost_back.txt" test_weibo_raw_path = path + "test_weibo_raw.txt" test_weibo_repost_path = path + "test_weibo_repost.txt" user_relations_path = path + "user_relations_back.txt" print("\n训练原始微博地址p1:%s" % train_weibo_raw_path) print("训练转发微博地址p2:%s" % train_weibo_repost_path) print("\n测试原始微博地址p3:%s" % test_weibo_raw_path) print("测试转发微博地址p4:%s" % test_weibo_repost_path) print("\n 用户关系地址p5:%s" % user_relations_path) return train_path,test_path,code_path,train_weibo_raw_path,train_weibo_repost_path,test_weibo_raw_path,test_weibo_repost_path,user_relations_path
#传递 训练(原始微博,转发微博) 或者 测试(原始微博,转发微博) #返回化简后的对应关系repost_id_line_time_reduce #返回微博id对应的用户idwid_uid_rdd from pyspark import SparkContext def get_prime_rdd(train_or_test,sc, p1,p2,p3,p4): if train_or_test == ‘train‘: inside_path_a = p1 inside_path_b = p2 elif train_or_test == ‘test‘: inside_path_a = p3 inside_path_b = p4 else: print("only input train or test") return 0,0 sc = sc train_weibo_raw_data = sc.textFile(inside_path_a) train_weibo_raw_data_count = train_weibo_raw_data.count() train_weibo_raw_data_rdd = train_weibo_raw_data.map(lambda x: x.split("\001")) w_id=train_weibo_raw_data_rdd.map(lambda x:x[0]) u_id=train_weibo_raw_data_rdd.map(lambda x:x[1]) wid_uid_rdd = w_id.zip(u_id) train_weibo_repost_data = sc.textFile(inside_path_b) train_weibo_repost_data_count = train_weibo_repost_data.count() train_weibo_repost_data_rdd = train_weibo_repost_data.map(lambda x: x.split("\001")) repost_id = train_weibo_repost_data_rdd.map(lambda x: x[0]) repost_line_time = train_weibo_repost_data_rdd.map(lambda x: x[1:-1]) repost_id_line_time = repost_id.zip(repost_line_time) repost_id_line_time_reduce = repost_id_line_time.groupByKey().mapValues(list) repost_id_line_time_reduce = repost_id_line_time_reduce.subtractByKey(repost_id_line_time_reduce.subtractByKey(wid_uid_rdd)) wid_uid_rdd = wid_uid_rdd.subtractByKey(wid_uid_rdd.subtractByKey(repost_id_line_time_reduce)) return repost_id_line_time_reduce,wid_uid_rdd
def get_uid_fnum_rdd(sc,p5): sc = sc user_relations_data = sc.textFile(p5) user_relations_data_count = user_relations_data.count() user_relations_data_rdd_1 = user_relations_data.map(lambda x: x.split("\t")[0]) user_relations_data_rdd_2 = user_relations_data.map(lambda x: x.split("\t")[1]) user_relations_data_rdd_user = user_relations_data_rdd_1 user_relations_data_rdd_fans = user_relations_data_rdd_2.map(lambda x: x.split("\x01")) user_fans = user_relations_data_rdd_user.zip(user_relations_data_rdd_fans) fans_nums = user_relations_data_rdd_fans.map(lambda s:len(s)) uid_fnum_rdd = user_fans.keys().zip(fans_nums) return uid_fnum_rdd
##版本 2 分时间段计算指定时间段的转发量 def cal_times_j(list,j): ct = 0 for i in range(len(list)): #if int(list[i][-1]) >= j*900 and int(list[i][-1]) <= (j+1)*900: #这里可以切换求累计的转发量还是区间的转发量 if int(list[i][-1]) <= (j)*900: ct += 1 return ct def cal_id_times_j(rdd,j): times = rdd.values().map(lambda x: cal_times_j(x,j)) rdd = rdd.keys().zip(times) return rdd def generate_times_file(rdd,k,path): for j in range(k-1,k+1): import csv a_path = str(path) + ‘wid_times/wid_times_‘+str(j)+‘.csv‘ #print(path) out_file_train_times_j = open(a_path,‘w‘) writer = csv.writer(out_file_train_times_j); zhuanfa = cal_id_times_j(rdd,j+1) for lists in zhuanfa.collect(): writer.writerow(lists) out_file_train_times_j.close()
#计算深度 #定义函数,计算出指定阶段的,发生过的转发关系 def cal_during(list,j): new_list=[] for i in range(len(list)): if int(list[i][-1]) <= j*900: new_list.append(list[i]) return new_list #定义函数,计算一个rdd中,指定阶段,发生过的转发关系 def cal_rdd_during(rdd,j): return rdd.map(lambda x: cal_during(x,j)) #定义函数,如果一个转发关系的尾部,是另外一个转发关系的头,那么久把这个头的尾部,加到这个转发关系的尾部 def add_deep(list): kkk = len(list) if kkk<=1: return list else: for i in range(kkk): for j in range(kkk): if list[i][-1] == list[j][0]: list[i].append(list[j][-1]) return list #定义函数返回序列中的数组的最长的值,作为最大的深度 def max_deep(list): max=2 if len(list)==0: return 0 else: for i in range(len(list)): max = (len(list[i]) if len(list[i])> max else max) return max-1 #定义函数,取出其中的两列 def ti_qu(list): for i in range(len(list)): list[i] = list[i][:-1] return list def cal_cal(all_in_one_rdd, j): id_rdd = all_in_one_rdd.keys() #获取ID的RDD line_time_rdd = all_in_one_rdd.values() #获取转发关系和转发时间对应的RDD line_time_rdd_j = cal_rdd_during(line_time_rdd,j) #指定时间段,获取这个时间段发生过的转发和时间组成的RDD line_rdd_j = line_time_rdd_j.map(lambda x : ti_qu(x))#提取转发关系 line_rdd_j_extend = line_rdd_j.map(lambda x: add_deep(x))#延长转发关系 line_rdd_j_extend_maxdeep = line_rdd_j_extend.map(lambda x:max_deep(x))#计算最大深度 id_deep_rdd_j = id_rdd.zip(line_rdd_j_extend_maxdeep)#组合微博ID与深度 return id_deep_rdd_j def generate_deeps_file(rdd,k,path): import csv for j in range(k-1,k+1): b_path = str(path) + ‘wid_deeps/wid_deeps_‘+str(j)+‘.csv‘ #print(path) out_file_train_deeps_j = open(b_path,‘w‘) writer = csv.writer(out_file_train_deeps_j); shendu = cal_cal(rdd,j+1) for lists in shendu.collect(): writer.writerow(lists) out_file_train_deeps_j.close()
def get_wid_fnum_rdd(uid_fnum_rdd,wid_uid_rdd,path): #print("用户和粉丝个数的对应关系,取出来一个看看:") #print(uid_fnum_rdd.take(3)) #print(uid_fnum_rdd.count()) #print("\n训练原始约减微博的id和发送微博的人的id的对应rdd:") #print(wid_uid_rdd.take(3)) #print(wid_uid_rdd.count()) uid_wid_rdd = wid_uid_rdd.values().zip(wid_uid_rdd.keys()) uid__wid_fnum = uid_wid_rdd.leftOuterJoin(uid_fnum_rdd) wid_fnum_rdd = uid__wid_fnum.values().map(lambda x: x[0]).zip(uid__wid_fnum.values().map(lambda x: x[1])) #print(wid_fnum_rdd.take(2)) #print(wid_fnum_rdd.count()) import csv c_path = str(path) + ‘wid_fnum_file.csv‘ wid_fnum_file = open(c_path,"w") writer = csv.writer(wid_fnum_file); for lists in wid_fnum_rdd.collect(): writer.writerow(lists); wid_fnum_file.close() return wid_fnum_rdd
#定义函数,将列表数组扁平化 def add_flat(list): if list==None: return 0 else: kkk = len(list) list0 = list[0] for i in range(kkk): if i==0: pass else: list0 = list0.append(list[i]) return list0 #定义函数,计算覆盖用户数目 def clac_cover(list): total_cover=0 for i in range(len(list)): total_cover += cover_value(list[i]) return total_cover #定义函数,计算某个用户的粉丝数: def cover_value(user): ‘‘‘ try: return uid_fnum_dict[user] except: return 0 ‘‘‘ for i in range(len(list_uid_fnum)): if user == list_uid_fnum[i][0]: return list_uid_fnum[i][1] else: return 0 def flatmapvalues(x): return x def cal_sum(x): sum = 0 if x==None and len(x)==0: return sum else: for i in range(len(x)): if x[i]== None: pass else: sum += int(x[i]) return sum def fans_cover_till_j(all_in_one_rdd,j): id_rdd = all_in_one_rdd.keys() #获取微博ID的RDD line_time_rdd = all_in_one_rdd.values() #获取转发关系和转发时间对应的RDD line_time_rdd_j = cal_rdd_during(line_time_rdd,j) #指定时间段,获取这个时间段发生过的转发和时间组成的RDD #print("\n指定时间段,获取这个时间段发生过的转发和时间组成的RDD");print(line_time_rdd_j.first()) line_rdd_j = line_time_rdd_j.map(lambda x : ti_qu(x))#提取转发关系 #print("\n提取转发关系");print(line_rdd_j.first()) #line_rdd_j.flatMap(lambda x: re.sub(r‘\D‘," ",x).split()) #line_rdd_j_flat = line_rdd_j.map(lambda x: add_flat(x))#扁平化转发关系,不行 import re line_rdd_j_flat = line_rdd_j.map(lambda x: re.sub(r‘\D‘," ",str(x)).split())#扁平化转发关系 #print("\n提取扁平化的转发关系");print(line_rdd_j_flat.first()) line_rdd_j_flat_disc = line_rdd_j_flat.map(lambda x:list(set(list(x)))) #扁平化之后约减重复的用户ID #print("\n看看去重之后的转发用户");print(line_rdd_j_flat_disc.first()) fans_cover_rdd_j = id_rdd.zip(line_rdd_j_flat_disc) #print("\n看看去重之后的微博ID和转发用户");print(fans_cover_rdd_j.first()) fans_cover_rdd_j = fans_cover_rdd_j.flatMapValues(flatmapvalues) #print("\n看看去重之后的微博ID和转发用户,一对一flatmap之后");print(fans_cover_rdd_j.first()) fans_cover_rdd_j = fans_cover_rdd_j.values().zip(fans_cover_rdd_j.keys()) #print("\n翻转id和用户");print(fans_cover_rdd_j.first()) fans_cover_rdd_j = fans_cover_rdd_j.leftOuterJoin(uid_fnum_rdd).values() #print("\n得到用户id_(微博ID,粉丝)");print(fans_cover_rdd_j.first()) #print(fans_cover_rdd_j.count()) fans_cover_rdd_j = fans_cover_rdd_j.map(lambda x: x[0]).zip(fans_cover_rdd_j.map(lambda x:x[1])) #print("\n得微博id_粉丝");print(fans_cover_rdd_j.first()) #print(fans_cover_rdd_j.count()) fans_cover_rdd_j = fans_cover_rdd_j.groupByKey().mapValues(list) #print("\n组合,");print(fans_cover_rdd_j.first()) #print(fans_cover_rdd_j.count()) fans_cover_rdd_j = fans_cover_rdd_j.keys().zip(fans_cover_rdd_j.values().map(lambda x: cal_sum(x))) #print("\nmap求和");print(fans_cover_rdd_j.first()) #cover_rdd = line_rdd_j_flat_disc.map(lambda x: clac_cover(x)) #fans_cover_rdd_j = id_rdd.zip(cover_rdd)#组合微博ID与覆盖数目 #print(id_deep_rdd_j.first()) #return line_rdd_j_extend_maxdeep temp_key_0 = all_in_one_rdd.keys().zip(all_in_one_rdd.values().map(lambda x: 0)) fans_cover_rdd_j = temp_key_0.leftOuterJoin(fans_cover_rdd_j) fans_cover_rdd_j = fans_cover_rdd_j.keys().zip(fans_cover_rdd_j.values().map(lambda x: cal_sum(x))) return fans_cover_rdd_j def generate_covers_file(rdd,k,path): #按理说没问题 import csv for j in range(k-1,k+1): c_path = str(path) + ‘wid_covers/wid_covers_‘+str(j)+‘.csv‘ #print(c_path) out_file_train_covers_j = open(c_path,‘w‘) writer = csv.writer(out_file_train_covers_j) covers = fans_cover_till_j(rdd,j+1) for lists in covers.collect(): writer.writerow(lists) out_file_train_covers_j.close()
px,py,pz,p1,p2,p3,p4,p5 = get_basic_info() uid_fnum_rdd = get_uid_fnum_rdd(sc,p5) train_repost_id_line_time_reduce, train_wid_uid_rdd = get_prime_rdd(‘train‘,sc,p1,p2,p3,p4) #wid_fnum_rdd = get_wid_fnum_rdd(uid_fnum_rdd,train_wid_uid_rdd,px) #generate_times_file(train_repost_id_line_time_reduce,292,px) #generate_deeps_file(train_repost_id_line_time_reduce,292,px) #generate_covers_file(train_repost_id_line_time_reduce,292,px) test_repost_id_line_time_reduce, test_wid_uid_rdd = get_prime_rdd(‘test‘,sc,p1,p2,p3,p4) #test_wid_fnum_rdd = get_wid_fnum_rdd(uid_fnum_rdd,test_wid_uid_rdd,py) #generate_times_file(test_repost_id_line_time_reduce,16,py) #generate_deeps_file(test_repost_id_line_time_reduce,16,py) #generate_covers_file(test_repost_id_line_time_reduce,16,py)
from pyspark.mllib.regression import LabeledPoint import numpy as np from pyspark.mllib.tree import RandomForest, RandomForestModel from pyspark.ml.linalg import Vectors from pyspark.ml.linalg import SparseVector,DenseVector #获取用户ID和粉丝数的对比 def get_wid_fnum_rdd(path): path = path+ ‘wid_fnum_file‘+‘.csv‘ wid_fnum_rdd = sc.textFile(path) wid_fnum_rdd = wid_fnum_rdd.map(lambda x:x.split(",")) wid_fnum_rdd = wid_fnum_rdd.map(lambda x:x[0]).zip(wid_fnum_rdd.map(lambda x:x[1])) wid_fnum_rdd = wid_fnum_rdd.sortByKey() return wid_fnum_rdd def add_keys(rdd1): rdd1 = rdd1 #path = ‘/home/jason/spark/weibo_predict/predicts/times_time_data_‘+str(15)+‘.txt‘ #rdd1 = sc.textFile(path) rdd2 = sc.textFile(‘/home/jason/spark/weibo_predict/test/wid_times/wid_times_0.csv‘) rdd2 = rdd2.map(lambda x:x.split(‘,‘)[0]).zip(rdd2.map(lambda x:x.split(‘,‘)[1])) rdd2 = rdd2.sortByKey() rdd1 = rdd1.zipWithIndex() rdd1 = rdd1.values().zip(rdd1.keys()) rdd2 = rdd2.keys().zipWithIndex() rdd2 = rdd2.values().zip(rdd2.keys()) rdd = rdd2.join(rdd1) rdd = rdd.values() rdd = rdd.map(lambda x: x[0]).zip(rdd.map(lambda x: x[1])) return rdd #获取其他三个需要的参数 def get_wid_x(j,path,times_or_deeps_or_covers): if times_or_deeps_or_covers == ‘times‘: if path == px: path = str(path) + ‘wid_times/wid_times_‘+str(j)+‘.csv‘ elif path ==py: if j>=0 and j<15: path = str(path) + ‘wid_times/wid_times_‘+str(j)+‘.csv‘ elif j>=15 and j<=291: path = ‘/home/jason/spark/weibo_predict/predicts/times_time_data_‘+str(j)+‘.txt‘ rdd1 = sc.textFile(path) rdd = add_keys(rdd1) return rdd elif times_or_deeps_or_covers == ‘deeps‘: if path == px: path = str(path) + ‘wid_deeps/wid_deeps_‘+str(j)+‘.csv‘ elif path ==py: if j>=0 and j<15: path = str(path) + ‘wid_deeps/wid_deeps_‘+str(j)+‘.csv‘ elif j>=15 and j<=291: path = ‘/home/jason/spark/weibo_predict/predicts/deeps_time_data_‘+str(j)+‘.txt‘ rdd1 = sc.textFile(path) rdd = add_keys(rdd1) return rdd elif times_or_deeps_or_covers == ‘covers‘: if path == px: path = str(path) + ‘wid_covers/wid_covers_‘+str(j)+‘.csv‘ elif path ==py: if j>=0 and j<15: path = str(path) + ‘wid_covers/wid_covers_‘+str(j)+‘.csv‘ elif j>=15 and j<=291: path = ‘/home/jason/spark/weibo_predict/predicts/covers_time_data_‘+str(j)+‘.txt‘ rdd1 = sc.textFile(path) rdd = add_keys(rdd1) return rdd else: print(‘wrong input about times_or_deeps_or_covers‘) return 0 rdd = sc.textFile(path) rdd = rdd.map(lambda x:x.split(",")) rdd = rdd.map(lambda x:x[0]).zip(rdd.map(lambda x:x[1])) rdd = rdd.sortByKey() return rdd #将两个RDDjoin返回一个rdd的函数 def my_join(rdd1,rdd2): import re rdd = rdd1.join(rdd2).keys().zip(rdd1.join(rdd2).values().map(lambda x:re.sub(r‘\D‘," ",str(x)).split())) return rdd #根据rdd的元素制作lib_svm格式文件 def lib_svm(x): str1 = str(x[0] + ‘ ‘) for i in range(len(x)): if i == 0: pass else: str1 += str(str(i) + ":" +str(x[i])+ ‘ ‘) return str1 #生成测试或者训练需要的数据 def generate_train_or_test_data(path,j,times_or_deeps): if times_or_deeps == ‘times‘: if path == px: data_path = str(px) + ‘train_data/times_train_data_‘+str(j)+‘.txt‘ wid_times_rdd = get_wid_x(j+1,path,‘times‘) elif path == py: data_path = str(py) + ‘test_data/times_test_data_‘+str(j)+‘.txt‘ wid_times_rdd = get_wid_x(j,path,‘times‘) #print(wid_times_rdd.count()) else: return 0 wid_fnum_rdd = get_wid_fnum_rdd(path) wid_deeps_rdd = get_wid_x(j,path,‘deeps‘) wid_covers_rdd = get_wid_x(j,path,‘covers‘) #wid_covers_rdd = wid_covers_rdd.keys().zip(wid_covers_rdd.values().map(lambda x:float(x)/1000)) records = my_join(wid_times_rdd,wid_fnum_rdd) records = my_join(records,wid_deeps_rdd) records = my_join(records,wid_covers_rdd) records = records.sortByKey() #print(‘看看训练集合中的keys()的顺序-------------------------------------------‘) #print(records.keys().take(10)) records = records.values() data = records.map(lambda x:lib_svm(x)) open_data_path = open(data_path,‘w‘) for lines in data.collect(): open_data_path.write(lines) open_data_path.write(‘\n‘) elif times_or_deeps == ‘deeps‘: if path == px: data_path = str(px) + ‘train_data/deeps_train_data_‘+str(j)+‘.txt‘ elif path == py: data_path = str(py) + ‘test_data/deeps_test_data_‘+str(j)+‘.txt‘ else: return 0 wid_fnum_rdd = get_wid_fnum_rdd(path) if path == py: wid_deeps_rdd = get_wid_x(j,path,‘deeps‘) else: wid_deeps_rdd = get_wid_x(j+1,path,‘deeps‘) wid_times_rdd = get_wid_x(j,path,‘times‘) wid_deeps_rdd = get_wid_x(j,path,‘deeps‘) wid_covers_rdd = get_wid_x(j,path,‘covers‘) #wid_covers_rdd = wid_covers_rdd.keys().zip(wid_covers_rdd.values().map(lambda x:float(x)/1000)) records = my_join(wid_deeps_rdd,wid_fnum_rdd) records = my_join(records,wid_times_rdd) records = my_join(records,wid_covers_rdd) records = records.values() data = records.map(lambda x:lib_svm(x)) open_data_path = open(data_path,‘w‘) for lines in data.collect(): open_data_path.write(lines) open_data_path.write(‘\n‘) open_data_path.close() elif times_or_deeps == ‘covers‘: if path == px: data_path = str(px) + ‘train_data/covers_train_data_‘+str(j)+‘.txt‘ elif path == py: data_path = str(py) + ‘test_data/covers_test_data_‘+str(j)+‘.txt‘ else: return 0 wid_fnum_rdd = get_wid_fnum_rdd(path) if path == py: wid_covers_rdd = get_wid_x(j,path,‘covers‘) else: wid_covers_rdd = get_wid_x(j+1,path,‘covers‘) #wid_covers_rdd = wid_covers_rdd.keys().zip(wid_covers_rdd.values().map(lambda x:float(x)/1000)) wid_times_rdd = get_wid_x(j,path,‘times‘) wid_deeps_rdd = get_wid_x(j,path,‘deeps‘) records = my_join(wid_covers_rdd,wid_fnum_rdd) records = my_join(records,wid_times_rdd) records = my_join(records,wid_deeps_rdd) records = records.values() data = records.map(lambda x:lib_svm(x)) open_data_path = open(data_path,‘w‘) for lines in data.collect(): open_data_path.write(lines) open_data_path.write(‘\n‘) open_data_path.close() else: return 0 #生成指定时段的预测结果 def generate_test_predict(j,times_or_deeps): if times_or_deeps == ‘times‘: from pyspark.mllib.tree import RandomForest, RandomForestModel from pyspark.mllib.util import MLUtils tr_path = ‘/home/jason/spark/weibo_predict/train/train_data/‘+‘times_train_data_‘+str(j)+‘.txt‘ te_path = ‘/home/jason/spark/weibo_predict/test/test_data/‘+‘times_test_data_‘+str(j)+‘.txt‘ train_data = MLUtils.loadLibSVMFile(sc,tr_path) test_data = MLUtils.loadLibSVMFile(sc,te_path) model = RandomForest.trainRegressor(train_data, categoricalFeaturesInfo={}, numTrees=3, featureSubsetStrategy="auto", impurity=‘variance‘, maxDepth=4, maxBins=32,seed=42) predictions = model.predict(test_data.map(lambda x: x.features)) pre_path = ‘/home/jason/spark/weibo_predict/predicts/‘+‘times_time_data_‘+str(j+1)+‘.txt‘ times_predict = open(pre_path,‘w‘) for lines in predictions.collect(): times_predict.write(str(int(lines))) times_predict.write(‘\n‘) times_predict.close() elif times_or_deeps == ‘deeps‘: from pyspark.mllib.tree import RandomForest, RandomForestModel from pyspark.mllib.util import MLUtils tr_path = ‘/home/jason/spark/weibo_predict/train/train_data/‘+‘deeps_train_data_‘+str(j)+‘.txt‘ te_path = ‘/home/jason/spark/weibo_predict/test/test_data/‘+‘deeps_test_data_‘+str(j)+‘.txt‘ train_data = MLUtils.loadLibSVMFile(sc,tr_path) test_data = MLUtils.loadLibSVMFile(sc,te_path) model = RandomForest.trainRegressor(train_data, categoricalFeaturesInfo={}, numTrees=3, featureSubsetStrategy="auto", impurity=‘variance‘, maxDepth=4, maxBins=32,seed=42) predictions = model.predict(test_data.map(lambda x: x.features)) pre_path = ‘/home/jason/spark/weibo_predict/predicts/‘+‘deeps_time_data_‘+str(j+1)+‘.txt‘ times_predict = open(pre_path,‘w‘) for lines in predictions.collect(): times_predict.write(str(int(lines))) times_predict.write(‘\n‘) times_predict.close() elif times_or_deeps == ‘covers‘: from pyspark.mllib.tree import RandomForest, RandomForestModel from pyspark.mllib.util import MLUtils tr_path = ‘/home/jason/spark/weibo_predict/train/train_data/‘+‘covers_train_data_‘+str(j)+‘.txt‘ te_path = ‘/home/jason/spark/weibo_predict/test/test_data/‘+‘covers_test_data_‘+str(j)+‘.txt‘ train_data = MLUtils.loadLibSVMFile(sc,tr_path) test_data = MLUtils.loadLibSVMFile(sc,te_path) model = RandomForest.trainRegressor(train_data, categoricalFeaturesInfo={}, numTrees=3, featureSubsetStrategy="auto", impurity=‘variance‘, maxDepth=4, maxBins=32,seed=42) predictions = model.predict(test_data.map(lambda x: x.features)) pre_path = ‘/home/jason/spark/weibo_predict/predicts/‘+‘covers_time_data_‘+str(j+1)+‘.txt‘ times_predict = open(pre_path,‘w‘) for lines in predictions.collect(): times_predict.write(str(int(lines))) times_predict.write(‘\n‘) times_predict.close() def generate_test_data_beyond15(j): path = ‘/home/jason/spark/weibo_predict/predicts/‘+‘time_data_‘+str(j)+‘.txt‘ rdd2 = sc.textFile(path) rdd1 = get_wid_fnum_rdd(py).keys() rdd = rdd1.zip(rdd2) return rdd
def add_keys(rdd1): rdd1 = rdd1 #path = ‘/home/jason/spark/weibo_predict/predicts/times_time_data_‘+str(15)+‘.txt‘ #rdd1 = sc.textFile(path) rdd2 = sc.textFile(‘/home/jason/spark/weibo_predict/test/wid_times/wid_times_0.csv‘) rdd2 = rdd2.map(lambda x:x.split(‘,‘)[0]).zip(rdd2.map(lambda x:x.split(‘,‘)[1])) rdd2 = rdd2.sortByKey() rdd1 = rdd1.zipWithIndex() rdd1 = rdd1.values().zip(rdd1.keys()) rdd2 = rdd2.keys().zipWithIndex() rdd2 = rdd2.values().zip(rdd2.keys()) rdd = rdd2.join(rdd1) rdd = rdd.values() rdd = rdd.map(lambda x: x[0]).zip(rdd.map(lambda x: x[1])) return rdd
for i in range(15): generate_train_or_test_data(px,i,‘times‘) generate_train_or_test_data(py,i,‘times‘) generate_test_predict(i,‘times‘) generate_train_or_test_data(px,i,‘deeps‘) generate_train_or_test_data(py,i,‘deeps‘) generate_test_predict(i,‘deeps‘) generate_train_or_test_data(px,i,‘covers‘) generate_train_or_test_data(py,i,‘covers‘) generate_test_predict(i,‘covers‘) for i in range(15,292): print(i) generate_train_or_test_data(px,i,‘times‘) generate_train_or_test_data(py,i,‘times‘) generate_test_predict(i,‘times‘) generate_train_or_test_data(px,i,‘deeps‘) generate_train_or_test_data(py,i,‘deeps‘) generate_test_predict(i,‘deeps‘) generate_train_or_test_data(px,i,‘covers‘) generate_train_or_test_data(py,i,‘covers‘) generate_test_predict(i,‘covers‘)
generate_train_or_test_data(px,291,‘times‘) generate_train_or_test_data(py,291,‘times‘) generate_test_predict(291,‘times‘) generate_train_or_test_data(px,291,‘deeps‘) generate_train_or_test_data(py,291,‘deeps‘) generate_test_predict(291,‘deeps‘) generate_train_or_test_data(px,291,‘covers‘) generate_train_or_test_data(py,291,‘covers‘) generate_test_predict(291,‘covers‘)
#组团搞出来最后的文件 rdd1 = sc.textFile(‘/home/jason/spark/weibo_predict/predicts/times_time_data_‘+str(1)+‘.txt‘) rdd1 = add_keys(rdd1) for j in range(4,292): j = j+1 if j==1: pass else: rdd2 = sc.textFile(‘/home/jason/spark/weibo_predict/predicts/times_time_data_‘+str(j)+‘.txt‘) rdd2 = add_keys(rdd2) rdd1 = my_join(rdd1,rdd2) for j in range(4,292): j=j+1 rdd3 = sc.textFile(‘/home/jason/spark/weibo_predict/predicts/deeps_time_data_‘+str(j)+‘.txt‘) rdd3 = add_keys(rdd3) rdd1 = my_join(rdd1,rdd3) def add_head(x): str1 = ‘testWeibo‘ str1 = str1+str(x) return str1 import re rdd1 = rdd1.map(lambda x: re.sub(r‘\D‘," ",str(x)).split()) rdd1 = rdd1.sortBy(lambda x: int(x[0])) rdd1 = rdd1.map(lambda x:x[0]).zip(rdd1.map(lambda x:x[1:])) rdd1_key = rdd1.keys().map(lambda x:add_head(x)) rdd1 = rdd1_key.zip(rdd1.values()) rdd1 = rdd1.map(lambda x: re.sub(r‘\D‘," ",str(x)).split()) import csv path = ‘/home/jason/spark/weibo_predict/‘ end_path = str(path) + ‘end_of_end.csv‘ end_f = open(end_path,‘w‘) writer = csv.writer(end_f) for lists in rdd1.collect(): writer.writerow(lists) end_f.close()
a=‘,‘ s1 = [‘scaleT‘+str((i+1)*15) for i in range(4,292)] s1 = a.join(s1) s2 = [‘depthT‘+str((i+1)*15) for i in range(4,292)] s2 = a.join(s2) s3 = ‘WeiboID (Time Unit: Minutes)‘+a+s1+s2 #print(s3) end_path_2 = ‘/home/jason/spark/weibo_predict/end_of_end.csv‘ end_path_1 = ‘/home/jason/spark/weibo_predict/end_of_end_.csv‘ rdd = sc.textFile(end_path_2) rdd = rdd.map(lambda x:add_head(x)) end_ff = open(end_path_1,‘w‘) end_ff.write(s3) end_ff.write(‘\n‘) for lists in rdd.collect(): end_ff.write(lists) end_ff.write(‘\n‘) end_ff.close()
以上是关于shape into blocks--source code in python based on pySpark的主要内容,如果未能解决你的问题,请参考以下文章