python structured_streaming

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了python structured_streaming相关的知识,希望对你有一定的参考价值。

#coding:utf-8
"""
启动指令:

PYSPARK_PYTHON=python3 spark-submit \
    --driver-memory 16G \
    --executor-memory 20G \
    --executor-cores 10 \
    --num-executors 100 \
    --conf spark.yarn.maxAppAttempts=4 \
    --conf spark.yarn.am.attemptFailuresValidityInterval=1h \
    --conf spark.yarn.executor.failuresValidityInterval=1h \
    --conf spark.task.maxFailures=8 \
    spark_rtb_toutiao_main_script.py
"""

import datetime
import os
import re
import time
import sys
from functools import partial
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType
from threading import Thread

def get_yesterday_str():
    """获取昨天日期字符串

    eg: 2018-07-06

    """
    today = datetime.date.today()
    oneday = datetime.timedelta(days=1)
    yesterday = today - oneday
    return yesterday

def get_last_partition_date(table_name, dp=None):
    """获取Hive表(dp分区)最新分区的时间"""
    partition_sql = """hive -e "show partitions """ \
                    + table_name + """;" 2>/dev/null|grep {}""".format(dp if dp is not None else '=') \
                    + """|tail -1|awk -F'[= /]' '{print $2}'"""
    output = os.popen(partition_sql).read().strip().split("\n")
    last_partition_date = output[0]

    # fallback: 使用昨天日期
    if len(last_partition_date) == 0:
        return get_yesterday_str()

    return last_partition_date

def sql_get_dim_product_active_table():
    """构造获取dim.dim_product_daily_item_sku表最新active分区数据的sql"""
    last_partition_date = get_last_partition_date("dim.dim_product_daily_item_sku", "active")

    return """
        SELECT
            max(division_id_new) as division_id_new,
            max(division_name_new) as division_name_new,
            max(pur_first_dept_cd) as ad_first_dept_id,
            max(pur_first_dept_name) as ad_first_dept_name,
            max(pur_second_dept_cd) as ad_second_dept_id,
            max(pur_second_dept_name) as ad_second_dept_name,
            max(pur_third_dept_cd) as ad_third_dept_id,
            max(pur_third_dept_name) as ad_third_dept_name,
            max(item_first_cate_cd) as item_first_cate_cd,
            max(item_first_cate_name) as item_first_cate_name,
            max(brand_code) as ad_sku_brand_code,
            max(barndname_full) as ad_sku_brandname_full,
            max(type) as type_raw,
            item_sku_id AS sku_id
        FROM
            dim.dim_product_daily_item_sku
        WHERE
            dt = '{}' and dp = 'active'
        group BY
            item_sku_id
        CLUSTER BY
            sku_id
    """.format(last_partition_date)


def sql_get_dim_media_table():
    """构造获取dim.dim_product_daily_item_sku表最新active分区数据的sql"""
    last_partition_media_date = get_last_partition_date("ad.fdm_dsp_media_chain")

    return """
        SELECT
                id as placement_id,
                cast(max(drawing_type) as string) AS drawing_type,
                cast(max(native_type) as string) AS native_type,
                max(target_group_type) as target_group_type
            FROM
                ad.fdm_dsp_media_chain 
            WHERE
                dt='{last_partition_media_date}'
            GROUP BY
                id
    """.format(last_partition_media_date=last_partition_media_date)

class ReloadFlag():
    def __init__(self):
        self.reload = False

def check_new_partition(table_name, dp=None, last_partition_date=None, query=None, reload_flag=None):
    """判断Hive表是否有新分区

    每10分钟检测一次, 当检测到新分区时停止查询, 触发reload

    """
    print("后台线程启动: 每10分钟监测表{}是否生成最新分区!".format(table_name))
    while not reload_flag.reload:
        partition_date = get_last_partition_date(table_name, dp)

        if partition_date != last_partition_date:
            print("监测到表{}生成最新分区:{}".format(table_name, partition_date))
            break
        else:
            print("监测表{}最新分区变化中!".format(table_name))

        time.sleep(60 * 10)

    reload_flag.reload = True
    query.stop()

def periodic_backup_checkpoint(
        backup_path,
        checkpoint_path,
        output_path,
        query_name,
        backup_interval=60 * 60):
    """周期性(默认1小时)备份checkpoint相关信息

    需要备份的文件(夹)有:
    1. checkpoint文件夹(可以指定多个checkpoint文件夹)
    2. 输出文件夹的_spark_metadata文件(可以指定多个输出文件夹)

    需要两者组合使用checkpoint才能生效

    """
    print("checkpoint备份线程启动")
    origin_backup_path = backup_path

    while True:
        # 分钟级时间, 例如201807061220
        now = datetime.datetime.now().strftime("%Y%m%d%H%M")
        backup_path = origin_backup_path + '/' + now
        os.system("hadoop fs -mkdir -p {}".format(backup_path))

        try:
            if isinstance(checkpoint_path, str):
                os.system("hadoop fs -cp {} {}/".format(checkpoint_path, backup_path))
            elif isinstance(checkpoint_path, list):
                for path in checkpoint_path:
                    os.system("hadoop fs -cp {} {}/".format(path, backup_path))

            if isinstance(output_path, str):
                os.system("hadoop fs -mkdir -p {}/{}".format(backup_path, query_name))
                os.system("hadoop fs -cp {}/_spark_metadata {}/{}/".format(
                    output_path, backup_path, query_name))
            elif isinstance(output_path, list):
                for path, query_name in zip(output_path, query_name):
                    os.system("hadoop fs -mkdir -p {}/{}".format(backup_path, query_name))
                    os.system("hadoop fs -cp {}/_spark_metadata {}/{}/".format(
                        output_path, backup_path, query_name))
        except:
            # 若在备份过程中出现任何异常(例如目录不存在异常), 跳过
            pass

        time.sleep(backup_interval)

def sort_batch_file_path_by_batchid(path):
    """_spark_metadata子路径排序规则"""
    if path.find('.') != -1:
        return int(path[path.rfind('/') + 1:path.find('.')])
    return int(path[path.rfind('/') + 1:])

def fix_checkpoint(output_path):
    """修正restart或程序异常退出可能造成的输出重复问题

    如果上次程序异常退出时已经输出了部分结果, 那么本次重启整个批次会出现重复,
    为了避免这个情况, 需要判断上次退出时是否已经进行了部分结果输出, 如果是, 则删除;
    如果不是, 则正常执行.

    """
    if os.system("hadoop fs -test -e {}/_spark_metadata".format(output_path)) == 0:
        batch_file_path_list = os.popen(
            "hadoop fs -ls {}/_spark_metadata| ".format(output_path) \
            + "awk 'NR > 1 {print $NF}'").read().strip().split("\n")
        batch_file_path_list.sort(key=sort_batch_file_path_by_batchid)
        last_batch_metadata = batch_file_path_list[-1]

        raw_line = os.popen(
            "hadoop fs -cat {}|tail -n 3|awk 'NR > 1'".format(last_batch_metadata)) \
            .read().strip().strip("\n")

        pattern = re.compile(r'=(.*?)/')
        last_batch_process_time = pattern.findall(raw_line)[0]

        # 从output文件夹获取最新输出文件夹的process_time

        raw_line = os.popen("hadoop fs -ls {}|tail -n 1".format(output_path)) \
            .read().strip().strip("\n")

        pattern = re.compile(r'=(.*?)$')
        if pattern.findall(raw_line) == []:
            return
        last_output_process_time = pattern.findall(raw_line)[0]

        print("last_batch_process_time:", last_batch_process_time)
        print("last_output_process_time:", last_output_process_time)

        if last_batch_process_time != last_output_process_time:
            rm_path = raw_line.split(' ')[-1]
            print("上批次程序异常退出, 正在清理输出文件...")
            os.system("hadoop fs -rm -r {}".format(rm_path))
            print("清理完成!")

RTB_SCHEMA = StructType() \
    .add("date", 'string') \
    .add("sku_id", 'long') \
    .add("ad_spread_type", 'integer') \
    .add("advertiser_type", 'integer') \
    .add("retrieval_type", 'integer') \
    .add("business_type", 'integer') \
    .add("pos_id", 'integer') \
    .add("ad_traffic_group", 'integer') \
    .add("ad_traffic_type", 'integer') \
    .add("campaign_type", 'integer') \
    .add("mobile_type", 'integer') \
    .add("ad_billing_type", 'integer') \
    .add("ad_type", 'integer') \
    .add("drawing_type", 'long') \
    .add("automated_bidding_type", 'long') \
    .add("tcpa_phase", 'integer') \
    .add("impressions", 'long') \
    .add("clicks", 'long') \
    .add("total_cost", 'long') \
    .add("real_cost", 'long') \
    .add("fake_cost", 'long') \
    .add("commission_cost", 'long') \
    .add("dt", 'string')

TOUTIAO_SCHEMA = StructType() \
    .add("loc", "string") \
    .add("item_sku_id", "integer") \
    .add("ad_cost_type", "string") \
    .add("advertiser_type", "string") \
    .add("retrieval_type", "string") \
    .add("business_type", "string") \
    .add("placement_id", "string") \
    .add("ad_traffic_type", "string") \
    .add("ad_traffic_group", "string") \
    .add("campaign_type", "string") \
    .add("ad_type", "string") \
    .add("terminal_type", "string") \
    .add("cost_type", "string") \
    .add("drawing_type", "string") \
    .add("native_type", "string") \
    .add("automated_bidding_type", "string") \
    .add("tcpa_phase", "string") \
    .add("click_time", "string") \
    .add("data_min", "integer") \
    .add("impressions", "double") \
    .add("clicks", "double") \
    .add("total_price", "double") \
    .add("real_price", "double") \
    .add("commission_price", "double") \
    .add("fake_price", "double") \
    .add("fake_unperf_price", "double") \
    .add("dt", "string")

CPA_SCHEMA = StructType() \
    .add("time","string") \
    .add("sku_id","string") \
    .add("division_id_new","string") \
    .add("division_name_new","string") \
    .add("ad_first_dept_id","string") \
    .add("ad_first_dept_name","string") \
    .add("ad_second_dept_id","string") \
    .add("ad_second_dept_name","string") \
    .add("ad_third_dept_id","string") \
    .add("ad_third_dept_name","string") \
    .add("model_type","string") \
    .add("resource_type","string") \
    .add("ad_sales_type","string") \
    .add("adv_type","string") \
    .add("retrieval_type","string") \
    .add("ad_business_type","string") \
    .add("placement_id","string") \
    .add("ad_traffic_type","string") \
    .add("ad_traffic_group","string") \
    .add("campaign_type","string") \
    .add("item_first_cate_cd","string") \
    .add("item_first_cate_name","string") \
    .add("ad_sku_brand_code","string") \
    .add("ad_sku_brandname_full","string") \
    .add("terminal_type_cd","string") \
    .add("terminal_type","string") \
    .add("ad_consumption_type","string") \
    .add("ad_type_cd","string") \
    .add("ad_type_name","string") \
    .add("drawing_type","string") \
    .add("native_type","string") \
    .add("media_type","string") \
    .add("automated_bidding_type","string") \
    .add("tcpa_phase","string") \
    .add("impressions","double") \
    .add("clicks","double") \
    .add("consumption","double") \
    .add("cash_consumption","double") \
    .add("commission_consumption","double") \
    .add("fake_consumption","double") \
    .add("dt","date")

CPS_SCHEMA = StructType() \
    .add("time","string") \
    .add("sku_id","string") \
    .add("division_id_new","string") \
    .add("division_name_new","string") \
    .add("ad_first_dept_id","string") \
    .add("ad_first_dept_name","string") \
    .add("ad_second_dept_id","string") \
    .add("ad_second_dept_name","string") \
    .add("ad_third_dept_id","string") \
    .add("ad_third_dept_name","string") \
    .add("model_type","string") \
    .add("resource_type","string") \
    .add("ad_sales_type","string") \
    .add("adv_type","string") \
    .add("retrieval_type","string") \
    .add("ad_business_type","string") \
    .add("placement_id","string") \
    .add("ad_traffic_type","string") \
    .add("ad_traffic_group","string") \
    .add("campaign_type","string") \
    .add("item_first_cate_cd","string") \
    .add("item_first_cate_name","string") \
    .add("ad_sku_brand_code","string") \
    .add("ad_sku_brandname_full","string") \
    .add("terminal_type_cd","string") \
    .add("terminal_type","string") \
    .add("ad_consumption_type","string") \
    .add("ad_type_cd","string") \
    .add("ad_type_name","string") \
    .add("drawing_type","string") \
    .add("native_type","string") \
    .add("media_type","string") \
    .add("automated_bidding_type","string") \
    .add("tcpa_phase","string") \
    .add("pop_or_b2c","string") \
    .add("impressions","double") \
    .add("clicks","double") \
    .add("consumption","double") \
    .add("cash_consumption","double") \
    .add("commission_consumption","double") \
    .add("fake_consumption","double") \
    .add("gross_income","double") \
    .add("cps_fee","double") \
    .add("dt","integer")


CPD_IMP_CLK_SCHEMA = StructType() \
    .add("date", 'string') \
    .add("sku_id", 'long') \
    .add("ad_spread_type", 'integer') \
    .add("advertiser_type", 'integer') \
    .add("retrieval_type", 'integer') \
    .add("business_type", 'integer') \
    .add("pos_id", 'integer') \
    .add("ad_traffic_group", 'integer') \
    .add("ad_traffic_type", 'integer') \
    .add("campaign_type", 'integer') \
    .add("mobile_type", 'integer') \
    .add("ad_billing_type", 'integer') \
    .add("ad_type", 'integer') \
    .add("drawing_type", 'long') \
    .add("automated_bidding_type", 'long') \
    .add("tcpa_phase", 'integer') \
    .add("material_id", 'long') \
    .add("impressions", 'long') \
    .add("clicks", 'long') \
    .add("dt", 'long')


CPS_IMP_CLK_SCHEMA = StructType() \
    .add("date", 'string') \
    .add("sku_id", 'string') \
    .add("ad_spread_type", 'integer') \
    .add("advertiser_type", 'integer') \
    .add("retrieval_type", 'integer') \
    .add("business_type", 'integer') \
    .add("pos_id", 'integer') \
    .add("ad_traffic_group", 'integer') \
    .add("ad_traffic_type", 'integer') \
    .add("campaign_type", 'integer') \
    .add("mobile_type", 'integer') \
    .add("ad_billing_type", 'integer') \
    .add("ad_type", 'integer') \
    .add("drawing_type", 'integer') \
    .add("automated_bidding_type", 'integer') \
    .add("tcpa_phase", 'integer') \
    .add("material_id", 'integer') \
    .add("clicks", 'long') \
    .add("dt", 'long')

rtb_common = """cast(sku_id as string),
            case
                 when ad_spread_type = 1 then
                  'internal'
                 when ad_spread_type = 2 then
                  'external'
                 when ad_spread_type = 3 then
                  'repage'
                 else
                  'Unknown'
               end as resource_type,
            case
                when ((ad_traffic_group = 2 or ad_traffic_group = 12)
                        and (ad_billing_type = 4 or ad_billing_type = 2)
                        and ad_spread_type = 2)
                    or ad_traffic_group = 162 then '1' --合约
                else '2' --效果
            end as ad_sales_type, -- 合约/效果
            cast(advertiser_type as string) as adv_type,
            cast(retrieval_type as string),
            cast(business_type as string) as ad_business_type, 
            cast(pos_id as string) AS placement_id, -- 广告位id
            cast(ad_traffic_group as string),
            cast(ad_traffic_type as string),
            cast(campaign_type as string),
            case
                when mobile_type = 0 then
                 'PC'
                when mobile_type = 1 then
                 'M'
                when mobile_type = 2 then
                 'APP'
                when mobile_type = 3 then
                 'WeChat'
                when mobile_type = 4 then
                 'MobileQQ'
                when mobile_type = 5 then
                 'Kepler'
                when mobile_type = 6 then
                 'ExtAPP'
                else
                 'Unknown'
            end as terminal_type, -- 设备类型
            cast(mobile_type as string) as terminal_type_cd,
            case
                when ad_billing_type = 0 then
                    'CPC'
                when ad_billing_type = 1 then
                    'CPM'
                when ad_billing_type = 2 then
                    'CPD'
                when ad_billing_type = 3 then
                    'CPS'
                when ad_billing_type = 4 then
                    'CPMFIX'
                else
                    'Unknown'
            end AS ad_consumption_type, -- 计费类型
            case
                when ad_type = 1 then
                 'image'
                when ad_type = 3 then
                 'imageText'
                when ad_type = 4 then
                 'shop'
                when ad_type = 5 then
                 'query'
                when ad_type = 6 then
                 'video'
                when ad_type = 8 then
                 'double_11'
                when ad_type = 9 then
                 'list'
                when ad_type = 10 then
                 'article'
                when ad_type = 11 then
                 'productVideo'
                when ad_type = 12 then
                 'recommend' 
                when ad_type = 100 then
                 'icon'
                else
                 'Unknown'
            end as ad_type_name,
            cast(ad_type as string) as ad_type_cd,
            cast(automated_bidding_type as string),
            cast(tcpa_phase as string),"""

def main():
    """spark主控程序"""
    if len(sys.argv) != 2:
        import setting
    elif sys.argv[1] == 'test':
        import setting_test as setting
    elif sys.argv[1] == 'history':
        import setting_history as setting
    else:
        import setting

    fix_checkpoint(setting.QUERY_MINUTE_LEVEL_RTB_TOUTIAO_SINK_PATH)

    spark = SparkSession \
        .builder \
        .appName("RTB and Toutiao Minute Level Data Pipeline") \
        .config("spark.sql.autoBroadcastJoinThreshold", "262144000") \
        .config("spark.sql.files.maxPartitionBytes", "262144000") \
        .config("spark.sql.streaming.schemaInferenc‌​e", "true") \
        .config("spark.sql.shuffle.partitions", "200") \
        .config(
        "spark.sql.streaming.checkpointLocation", setting.MINUTE_LEVEL_BOARD_CHECKPOINT_PATH) \
        .enableHiveSupport() \
        .getOrCreate()

    spark.sparkContext.setLogLevel("WARN")

    rtb_stream_df = spark \
        .readStream \
        .format('parquet') \
        .option("latestFirst", "true") \
        .option('path', setting.RTB_DATA_BASE_PATH) \
        .option('maxFilesPerTrigger',500) \
        .schema(RTB_SCHEMA) \
        .load()

    rtb_stream_df.createOrReplaceTempView("rtb_stream_df")

    rtb_formatted_stream_df = spark.sql("""
        select
            {rtb_common}
            cast(total_cost as double) AS consumption, -- 总消耗
            cast(real_cost as double) AS cash_consumption, -- 真钱扣费
            cast(commission_cost as double) AS commission_consumption, -- POP返佣
            cast(fake_cost as double) AS fake_consumption, -- 虚拟金消耗
            cast(impressions as double),
            cast(clicks as double),
            -- REGEXP_REPLACE(date, '[- :]', '') AS time,
            cast(from_unixtime(unix_timestamp(date,'yyyy-MM-dd HH:mm:ss'),'yyyyMMddHHmm') as bigint) as time,
            'RTB' as dp
        from
            rtb_stream_df
    """.format(rtb_common=rtb_common))

    toutiao_stream_df = spark \
        .readStream \
        .format("orc") \
        .option("path", setting.TOUTIAO_DATA_BASE_PATH) \
        .option('maxFilesPerTrigger',500) \
        .schema(TOUTIAO_SCHEMA) \
        .load()

    toutiao_stream_df.createOrReplaceTempView("toutiao_stream_df")

    toutiao_formatted_stream_df = spark.sql("""
        SELECT
            cast(item_sku_id as string) AS sku_id,
            case
                 when loc = '1' then
                  'internal'
                 when loc = '2' then
                  'external'
                 when loc = '3' then
                  'repage'
                 else
                  'Unknown'
               end as resource_type,
            CASE
                WHEN ad_cost_type = '合约' THEN '1'
                WHEN ad_cost_type = '效果' THEN '2'
                ELSE '2'
            END AS ad_sales_type,
            advertiser_type as adv_type,
            retrieval_type,
            business_type as ad_business_type,
            placement_id,
            ad_traffic_group,
            ad_traffic_type,
            campaign_type,
            terminal_type,
            case
                when terminal_type = 'PC' then '0'              
                when terminal_type = 'M' then '1'
                when terminal_type = 'APP' then '2'
                when terminal_type = 'WeChat' then '3'
                when terminal_type = 'MobileQQ' then '4'
                when terminal_type = 'Kepler' then '5'
                when terminal_type = 'ExtAPP' then '6'
                else '-999999'
            end as terminal_type_cd,
            cost_type as ad_consumption_type,
            ad_type as ad_type_name,
            case
                when ad_type = 'image' then '1'                 
                when ad_type = 'imageText' then '3'                
                when ad_type = 'shop' then '4'                 
                when ad_type = 'query' then '5'                 
                when ad_type = 'video' then '6'               
                when ad_type = 'double_11' then '8'                 
                when ad_type = 'list' then '9'               
                when ad_type = 'article' then '10'               
                when ad_type = 'productVideo' then '11'                 
                when ad_type = 'recommend' then '12'               
                when ad_type = 'icon' then '100'                 
                else
                 '-999999'
            end as ad_type_cd,
            automated_bidding_type,
            tcpa_phase,
            total_price as consumption,
            real_price as cash_consumption,
            commission_price as commission_consumption,
            fake_price as fake_consumption,
            impressions,
            clicks,
            cast(from_unixtime((unix_timestamp(dt,'yyyyMMddHHmm')-600+data_min*60),'yyyyMMddHHmm') as bigint) as time,
            'TouTiao' as dp
        FROM
            toutiao_stream_df
    """)

    unioned_tt_stream_df = rtb_formatted_stream_df.union(toutiao_formatted_stream_df)

    cpa_stream_df = spark \
        .readStream \
        .format("orc") \
        .option("path", setting.CPA_DATA_BASE_PATH) \
        .option('maxFilesPerTrigger',500) \
        .schema(CPA_SCHEMA) \
        .load()

    cpa_stream_df.createOrReplaceTempView("cpa_stream_df")

    cpa_formatted_stream_df = spark.sql("""
        select 
            sku_id, --NULL、3125212
            resource_type,
            ad_sales_type,
            adv_type,                      
            retrieval_type,
            ad_business_type,
            placement_id,
            ad_traffic_group,
            ad_traffic_type,
            campaign_type,
            terminal_type,
            terminal_type_cd,
            ad_consumption_type,
            ad_type_name, --contextMarketing
            ad_type_cd, --NULL
            automated_bidding_type,
            tcpa_phase,
            consumption, --0
            cash_consumption, --0
            commission_consumption, --0
            fake_consumption, --0
            impressions, --0
            clicks, --0
            time_new as time,
            'CPA' as dp
        from 
            (SELECT          
                cast(from_unixtime(unix_timestamp(time,'yyyy-MM-dd HH:mm:ss'),'yyyyMMddHHmm') as bigint) as time_new, --2018-07-24 03:00:**、2018-07-24 03:01:**
                *
            FROM
                cpa_stream_df
            ) a
    """)

    unioned_cpa_stream_df = unioned_tt_stream_df.union(cpa_formatted_stream_df)

    cps_stream_df = spark \
        .readStream \
        .format("orc") \
        .option("path", setting.CPS_DATA_BASE_PATH) \
        .option('maxFilesPerTrigger',500) \
        .schema(CPS_SCHEMA) \
        .load()

    cps_stream_df.createOrReplaceTempView("cps_stream_df")

    cps_formatted_stream_df = spark.sql("""
        SELECT
            sku_id,
            resource_type,--中文
            ad_sales_type,
            adv_type,
            retrieval_type,
            ad_business_type,
            placement_id,
            ad_traffic_group,
            ad_traffic_type,
            campaign_type,
            terminal_type,
            terminal_type_cd,
            ad_consumption_type,
            ad_type_name, --NULL
            ad_type_cd, --NULL
            automated_bidding_type, --NULL
            tcpa_phase, --NULL
            consumption, 
            cash_consumption,
            commission_consumption, --0
            fake_consumption, --0
            impressions, --0
            clicks, --0
            time_new as time,  --2018-07-24 03:00:**、2018-07-24 03:01:**
            'CPS' as dp
        from
            (select 
                cast(from_unixtime(unix_timestamp(time,'yyyy-MM-dd HH:mm:ss'),'yyyyMMddHHmm') as bigint) as time_new,  --2018-07-24 03:00:**、2018-07-24 03:01:**
                *
            FROM
                cps_stream_df
            ) a
    """)

    unioned_cps_stream_df = unioned_cpa_stream_df.union(cps_formatted_stream_df)


    cpd_imp_clk_stream_df = spark \
        .readStream \
        .format('parquet') \
        .option("latestFirst", "true") \
        .option('path', setting.CPD_IMP_CLK_DATA_BASE_PATH) \
        .option('maxFilesPerTrigger',500) \
        .schema(CPD_IMP_CLK_SCHEMA) \
        .load()

    cpd_imp_clk_stream_df.createOrReplaceTempView("cpd_imp_clk_stream_df")

    cpd_imp_clk_formatted_stream_df = spark.sql("""
        select
            {rtb_common}
            cast(0.0 as double) AS consumption, -- 总消耗
            cast(0.0 as double) AS cash_consumption, -- 真钱扣费
            cast(0.0 as double) AS commission_consumption, -- POP返佣
            cast(0.0 as double) AS fake_consumption, -- 虚拟金消耗
            cast(impressions as double),
            cast(clicks as double),
            cast(from_unixtime(unix_timestamp(date,'yyyy-MM-dd HH:mm:ss'),'yyyyMMddHHmm') as bigint) as time,
            'CPD_imp_clk' as dp
        from
            cpd_imp_clk_stream_df
    """.format(rtb_common=rtb_common))


    unioned_cpd_imp_clk_stream_df = unioned_cps_stream_df.union(cpd_imp_clk_formatted_stream_df)

    cps_imp_clk_stream_df = spark \
        .readStream \
        .format('parquet') \
        .option("latestFirst", "true") \
        .option('path', setting.CPS_IMP_CLK_DATA_BASE_PATH) \
        .option('maxFilesPerTrigger',500) \
        .schema(CPS_IMP_CLK_SCHEMA) \
        .load()

    cps_imp_clk_stream_df.createOrReplaceTempView("cps_imp_clk_stream_df")

    cps_imp_clk_formatted_stream_df = spark.sql("""
        select
            {rtb_common}
            cast(0.0 as double) AS consumption, -- 总消耗
            cast(0.0 as double) AS cash_consumption, -- 真钱扣费
            cast(0.0 as double) AS commission_consumption, -- POP返佣
            cast(0.0 as double) AS fake_consumption, -- 虚拟金消耗
            cast(0.0 as double) AS impressions,
            cast(clicks as double),
            cast(from_unixtime(unix_timestamp(date,'yyyy-MM-dd HH:mm:ss'),'yyyyMMddHHmm') as bigint) as time,
            'CPS_imp_clk' as dp
        from
            cps_imp_clk_stream_df
    """.format(rtb_common=rtb_common))


    unioned_stream_df = unioned_cpd_imp_clk_stream_df.union(cps_imp_clk_formatted_stream_df)


    # 记录上一次扫描商品表时的最新分区时间, 用于决定是否需要reload商品表
    last_product_partition_date = get_last_partition_date("dim.dim_product_daily_item_sku", "active")
    last_media_partition_date = get_last_partition_date("ad.fdm_dsp_media_chain")

    dim_product_active_check_new_partition = partial(
        check_new_partition,
        "dim.dim_product_daily_item_sku",
        "active")
    dim_media_check_new_partition = partial(check_new_partition, "ad.fdm_dsp_media_chain", None)

    while True:
        reload_flag = ReloadFlag()
        product_df = spark.sql(sql_get_dim_product_active_table()).cache()
        media_df = spark.sql(sql_get_dim_media_table()).cache()

        joined_product_stream_df = unioned_stream_df.join(product_df, "sku_id", "left_outer")
        joined_media_stream_df = joined_product_stream_df.join(
            media_df,
            "placement_id",
            "left_outer")

        joined_media_stream_df.createOrReplaceTempView("joined_media_stream_df")
        output_stream_df = spark.sql("""
            select
                resource_type,
                ad_sales_type,
                adv_type,
                retrieval_type,
                ad_business_type_new as ad_business_type,
                placement_id,
                ad_traffic_group,
                ad_traffic_type,
                campaign_type,
                terminal_type,
                terminal_type_cd,
                ad_consumption_type,
                ad_type_name,
                ad_type_cd,
                automated_bidding_type_new as automated_bidding_type,
                tcpa_phase_new as tcpa_phase,
                consumption,
                cash_consumption,
                commission_consumption,
                fake_consumption,
                impressions,
                clicks,
                time,
                division_id_new,
                division_name_new,
                ad_first_dept_id,
                ad_first_dept_name,
                ad_second_dept_id,
                ad_second_dept_name,
                ad_third_dept_id,
                ad_third_dept_name,
                item_first_cate_cd,
                item_first_cate_name,
                ad_sku_brand_code,
                ad_sku_brandname_full,
                drawing_type,
                native_type,
                media_type,
                CASE WHEN (type_raw  IS NULL or type_raw  IN ('NULL','null','','unknown')) THEN
                    CASE WHEN adv_type = 2 THEN 'POP'
                        WHEN adv_type IN (12,13) THEN 'EXTERNAL'
                        ELSE 'B2C' END
                    ELSE type_raw END
                as model_type, -- 模式
                account_type,
                from_unixtime(unix_timestamp(),'yyyyMMddHHmm00') AS process_time, -- 统一处理时间打标
                cast(from_unixtime(unix_timestamp(cast(time as string),'yyyyMMddHHmm'),'yyyy-MM-dd') as date) as dt,
                dp
            from
                (select 
                    case when dp = 'CPD_imp_clk' then '16'
                        when dp = 'CPS_imp_clk' then '32'
                        else ad_business_type 
                        end as ad_business_type_new,
                    coalesce(automated_bidding_type,0) as automated_bidding_type_new,
                    coalesce(tcpa_phase,0) as tcpa_phase_new,
                    case when drawing_type = '1' and native_type = '1' or retrieval_type = '2' then '搜索'
                        when drawing_type = '1' and native_type = '2' then '推荐'
                        when target_group_type = '1' then '首焦'
                        when dp = 'GDT' then '腾讯'
                        when ad_traffic_type in ('133','162') or (dp = 'RTB' and resource_type in ('external','repage') and (ad_traffic_type in ('13','59','99') or placement_id in ('813','833')) and ad_business_type not in ('64','8192','32768')) then '头条'
                        when dp = 'RTB' and resource_type in ('external','repage') and ad_traffic_type in ('1','4','5','7','37','51','58','81','90','122','138','139') and ad_business_type not in ('64','8192','32768') then '百度'
                        when dp = 'RTB' and resource_type in ('external','repage') and ad_traffic_type in ('52') and ad_business_type not in ('64','8192','32768') then '讯飞'
                        when dp in ('CPS','CPA','CPS_imp_clk') then '联盟'
                        else '未知'
                    end as media_type,
                    case when adv_type in ('2','3','7','9','12','13') then 'accountCash'
                         else 'accountMchVirtual'
                    end as account_type,
                    *
                from
                    joined_media_stream_df
                ) a               
        """)

        query_minute_level_rtb_toutiao = output_stream_df \
            .writeStream \
            .trigger(processingTime='60 seconds') \
            .queryName(setting.QUERY_NAME_MINUTE_LEVEL_RTB_TOUTIAO) \
            .partitionBy('process_time','dt','dp') \
            .outputMode("append") \
            .format("parquet") \
            .option("path", setting.QUERY_MINUTE_LEVEL_RTB_TOUTIAO_SINK_PATH) \
            .start()

        check_need_for_reload_product_thread = Thread(
            target=dim_product_active_check_new_partition,
            args=(last_product_partition_date, query_minute_level_rtb_toutiao, reload_flag))

        check_need_for_reload_product_thread.daemon = True
        check_need_for_reload_product_thread.start()

        check_need_for_reload_media_thread = Thread(
            target=dim_media_check_new_partition,
            args=(last_media_partition_date, query_minute_level_rtb_toutiao, reload_flag))

        check_need_for_reload_media_thread.daemon = True
        check_need_for_reload_media_thread.start()

        # 任何一个查询终止后程序立即退出, 防止出现某些查询失败但程序仍旧运行的情况
        spark.streams.awaitAnyTermination()

        query_minute_level_rtb_toutiao.stop()
        # 必须重置状态, 否则awaitAnyTermination()不会再次生效
        spark.streams.resetTerminated()

        # 清除过期的static data frame缓存(非阻塞清除)
        product_df.unpersist()
        media_df.unpersist()
        # 更新最新分区时间
        last_product_partition_date = get_last_partition_date("dim.dim_product_daily_item_sku", "active")

        print("reload 表dim.dim_product_daily_item_sku active分区 dt={}".format(last_product_partition_date))

        last_media_partition_date = get_last_partition_date("ad.fdm_dsp_media_chain")

        print("reload 表ad.fdm_dsp_media_chain分区 dt={}".format(last_media_partition_date))

if __name__ == "__main__":
    main()

以上是关于python structured_streaming的主要内容,如果未能解决你的问题,请参考以下文章

Python代写,Python作业代写,代写Python,代做Python

Python开发

Python,python,python

Python 介绍

Python学习之认识python

python初识