python pyspark-RevenuePerProductForMonthAccumulator.py

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了python pyspark-RevenuePerProductForMonthAccumulator.py相关的知识,希望对你有一定的参考价值。

import sys
import ConfigParser as cp
try:
    from pyspark import SparkConf, SparkContext
    from pyspark.sql import SQLContext, Row, functions as func

    props = cp.RawConfigParser()
    props.read("src/main/resources/application.properties")

    conf = SparkConf().setAppName("Total Revenue Per Day").setMaster(props.get(sys.argv[5], "executionMode"))

    sc = SparkContext(conf=conf)
    inputPath = sys.argv[1]
    outputPath = sys.argv[2]
    month = sys.argv[3]

    Path = sc._gateway.jvm.org.apache.hadoop.fs.Path
    FileSystem = sc._gateway.jvm.org.apache.hadoop.fs.FileSystem
    Configuration = sc._gateway.jvm.org.apache.hadoop.conf.Configuration

    fs = FileSystem.get(Configuration())

    if(fs.exists(Path(inputPath)) == False):
        print("Input path does not exists")
    else:
        if(fs.exists(Path(outputPath))):
            fs.delete(Path(outputPath), True)

        # Filter for orders which fall in the month passed as argument
        ordersCount = sc.accumulator(0)
        orders = inputPath + "/orders"

        def getOrdersTuples(rec):
            ordersCount.add(1)
            return (int(rec.split(",")[0]), 1)
        ordersFiltered = sc.textFile(orders). \
        filter(lambda order: month in order.split(",")[1]). \
        map(lambda order: getOrdersTuples(order))

        # Join filtered orders and order_items to get order_item details for a given month
        # Get revenue for each product_id
        orderItemsCount = sc.accumulator(0)
        orderItems = inputPath + "/order_items"

        def getProductIdAndRevenue(rec):
            orderItemsCount.add(1)
            return rec[1][0]
        revenueByProductId = sc.textFile(orderItems). \
            map(lambda orderItem:
                (int(orderItem.split(",")[1]), (int(orderItem.split(",")[2]), float(orderItem.split(",")[4])))
            ). \
            join(ordersFiltered). \
            map(lambda rec: getProductIdAndRevenue(rec)). \
            reduceByKey(lambda total, ele: total + ele)

        # We need to read products from local file system
        localPath = sys.argv[4]
        productsFile = open(localPath + "/products/part-00000")
        products = productsFile.read().splitlines()

        # Convert into RDD and extract product_id and product_name
        # Join it with aggregated order_items (product_id, revenue)
        # Get product_name and revenue for each product
        sc.parallelize(products). \
            map(lambda product: (int(product.split(",")[0]), product.split(",")[2])). \
        join(revenueByProductId). \
        map(lambda rec: rec[1][0] + "\t" + str(rec[1][1])). \
        saveAsTextFile(outputPath)
except ImportError as e:
    print ("Can not import Spark Modules", e)
sys.exit(1)

以上是关于python pyspark-RevenuePerProductForMonthAccumulator.py的主要内容,如果未能解决你的问题,请参考以下文章

Python代写,Python作业代写,代写Python,代做Python

Python开发

Python,python,python

Python 介绍

Python学习之认识python

python初识