python pyspark-RevenuePerProductForMonthAccumulator.py
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了python pyspark-RevenuePerProductForMonthAccumulator.py相关的知识,希望对你有一定的参考价值。
import sys
import ConfigParser as cp
try:
from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext, Row, functions as func
props = cp.RawConfigParser()
props.read("src/main/resources/application.properties")
conf = SparkConf().setAppName("Total Revenue Per Day").setMaster(props.get(sys.argv[5], "executionMode"))
sc = SparkContext(conf=conf)
inputPath = sys.argv[1]
outputPath = sys.argv[2]
month = sys.argv[3]
Path = sc._gateway.jvm.org.apache.hadoop.fs.Path
FileSystem = sc._gateway.jvm.org.apache.hadoop.fs.FileSystem
Configuration = sc._gateway.jvm.org.apache.hadoop.conf.Configuration
fs = FileSystem.get(Configuration())
if(fs.exists(Path(inputPath)) == False):
print("Input path does not exists")
else:
if(fs.exists(Path(outputPath))):
fs.delete(Path(outputPath), True)
# Filter for orders which fall in the month passed as argument
ordersCount = sc.accumulator(0)
orders = inputPath + "/orders"
def getOrdersTuples(rec):
ordersCount.add(1)
return (int(rec.split(",")[0]), 1)
ordersFiltered = sc.textFile(orders). \
filter(lambda order: month in order.split(",")[1]). \
map(lambda order: getOrdersTuples(order))
# Join filtered orders and order_items to get order_item details for a given month
# Get revenue for each product_id
orderItemsCount = sc.accumulator(0)
orderItems = inputPath + "/order_items"
def getProductIdAndRevenue(rec):
orderItemsCount.add(1)
return rec[1][0]
revenueByProductId = sc.textFile(orderItems). \
map(lambda orderItem:
(int(orderItem.split(",")[1]), (int(orderItem.split(",")[2]), float(orderItem.split(",")[4])))
). \
join(ordersFiltered). \
map(lambda rec: getProductIdAndRevenue(rec)). \
reduceByKey(lambda total, ele: total + ele)
# We need to read products from local file system
localPath = sys.argv[4]
productsFile = open(localPath + "/products/part-00000")
products = productsFile.read().splitlines()
# Convert into RDD and extract product_id and product_name
# Join it with aggregated order_items (product_id, revenue)
# Get product_name and revenue for each product
sc.parallelize(products). \
map(lambda product: (int(product.split(",")[0]), product.split(",")[2])). \
join(revenueByProductId). \
map(lambda rec: rec[1][0] + "\t" + str(rec[1][1])). \
saveAsTextFile(outputPath)
except ImportError as e:
print ("Can not import Spark Modules", e)
sys.exit(1)
以上是关于python pyspark-RevenuePerProductForMonthAccumulator.py的主要内容,如果未能解决你的问题,请参考以下文章
Python代写,Python作业代写,代写Python,代做Python
Python开发
Python,python,python
Python 介绍
Python学习之认识python
python初识