python structured_streaming
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了python structured_streaming相关的知识,希望对你有一定的参考价值。
#coding:utf-8
"""
启动指令:
PYSPARK_PYTHON=python3 spark-submit \
--driver-memory 16G \
--executor-memory 20G \
--executor-cores 10 \
--num-executors 100 \
--conf spark.yarn.maxAppAttempts=4 \
--conf spark.yarn.am.attemptFailuresValidityInterval=1h \
--conf spark.yarn.executor.failuresValidityInterval=1h \
--conf spark.task.maxFailures=8 \
spark_rtb_toutiao_main_script.py
"""
import datetime
import os
import re
import time
import sys
from functools import partial
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType
from threading import Thread
def get_yesterday_str():
"""获取昨天日期字符串
eg: 2018-07-06
"""
today = datetime.date.today()
oneday = datetime.timedelta(days=1)
yesterday = today - oneday
return yesterday
def get_last_partition_date(table_name, dp=None):
"""获取Hive表(dp分区)最新分区的时间"""
partition_sql = """hive -e "show partitions """ \
+ table_name + """;" 2>/dev/null|grep {}""".format(dp if dp is not None else '=') \
+ """|tail -1|awk -F'[= /]' '{print $2}'"""
output = os.popen(partition_sql).read().strip().split("\n")
last_partition_date = output[0]
# fallback: 使用昨天日期
if len(last_partition_date) == 0:
return get_yesterday_str()
return last_partition_date
def sql_get_dim_product_active_table():
"""构造获取dim.dim_product_daily_item_sku表最新active分区数据的sql"""
last_partition_date = get_last_partition_date("dim.dim_product_daily_item_sku", "active")
return """
SELECT
max(division_id_new) as division_id_new,
max(division_name_new) as division_name_new,
max(pur_first_dept_cd) as ad_first_dept_id,
max(pur_first_dept_name) as ad_first_dept_name,
max(pur_second_dept_cd) as ad_second_dept_id,
max(pur_second_dept_name) as ad_second_dept_name,
max(pur_third_dept_cd) as ad_third_dept_id,
max(pur_third_dept_name) as ad_third_dept_name,
max(item_first_cate_cd) as item_first_cate_cd,
max(item_first_cate_name) as item_first_cate_name,
max(brand_code) as ad_sku_brand_code,
max(barndname_full) as ad_sku_brandname_full,
max(type) as type_raw,
item_sku_id AS sku_id
FROM
dim.dim_product_daily_item_sku
WHERE
dt = '{}' and dp = 'active'
group BY
item_sku_id
CLUSTER BY
sku_id
""".format(last_partition_date)
def sql_get_dim_media_table():
"""构造获取dim.dim_product_daily_item_sku表最新active分区数据的sql"""
last_partition_media_date = get_last_partition_date("ad.fdm_dsp_media_chain")
return """
SELECT
id as placement_id,
cast(max(drawing_type) as string) AS drawing_type,
cast(max(native_type) as string) AS native_type,
max(target_group_type) as target_group_type
FROM
ad.fdm_dsp_media_chain
WHERE
dt='{last_partition_media_date}'
GROUP BY
id
""".format(last_partition_media_date=last_partition_media_date)
class ReloadFlag():
def __init__(self):
self.reload = False
def check_new_partition(table_name, dp=None, last_partition_date=None, query=None, reload_flag=None):
"""判断Hive表是否有新分区
每10分钟检测一次, 当检测到新分区时停止查询, 触发reload
"""
print("后台线程启动: 每10分钟监测表{}是否生成最新分区!".format(table_name))
while not reload_flag.reload:
partition_date = get_last_partition_date(table_name, dp)
if partition_date != last_partition_date:
print("监测到表{}生成最新分区:{}".format(table_name, partition_date))
break
else:
print("监测表{}最新分区变化中!".format(table_name))
time.sleep(60 * 10)
reload_flag.reload = True
query.stop()
def periodic_backup_checkpoint(
backup_path,
checkpoint_path,
output_path,
query_name,
backup_interval=60 * 60):
"""周期性(默认1小时)备份checkpoint相关信息
需要备份的文件(夹)有:
1. checkpoint文件夹(可以指定多个checkpoint文件夹)
2. 输出文件夹的_spark_metadata文件(可以指定多个输出文件夹)
需要两者组合使用checkpoint才能生效
"""
print("checkpoint备份线程启动")
origin_backup_path = backup_path
while True:
# 分钟级时间, 例如201807061220
now = datetime.datetime.now().strftime("%Y%m%d%H%M")
backup_path = origin_backup_path + '/' + now
os.system("hadoop fs -mkdir -p {}".format(backup_path))
try:
if isinstance(checkpoint_path, str):
os.system("hadoop fs -cp {} {}/".format(checkpoint_path, backup_path))
elif isinstance(checkpoint_path, list):
for path in checkpoint_path:
os.system("hadoop fs -cp {} {}/".format(path, backup_path))
if isinstance(output_path, str):
os.system("hadoop fs -mkdir -p {}/{}".format(backup_path, query_name))
os.system("hadoop fs -cp {}/_spark_metadata {}/{}/".format(
output_path, backup_path, query_name))
elif isinstance(output_path, list):
for path, query_name in zip(output_path, query_name):
os.system("hadoop fs -mkdir -p {}/{}".format(backup_path, query_name))
os.system("hadoop fs -cp {}/_spark_metadata {}/{}/".format(
output_path, backup_path, query_name))
except:
# 若在备份过程中出现任何异常(例如目录不存在异常), 跳过
pass
time.sleep(backup_interval)
def sort_batch_file_path_by_batchid(path):
"""_spark_metadata子路径排序规则"""
if path.find('.') != -1:
return int(path[path.rfind('/') + 1:path.find('.')])
return int(path[path.rfind('/') + 1:])
def fix_checkpoint(output_path):
"""修正restart或程序异常退出可能造成的输出重复问题
如果上次程序异常退出时已经输出了部分结果, 那么本次重启整个批次会出现重复,
为了避免这个情况, 需要判断上次退出时是否已经进行了部分结果输出, 如果是, 则删除;
如果不是, 则正常执行.
"""
if os.system("hadoop fs -test -e {}/_spark_metadata".format(output_path)) == 0:
batch_file_path_list = os.popen(
"hadoop fs -ls {}/_spark_metadata| ".format(output_path) \
+ "awk 'NR > 1 {print $NF}'").read().strip().split("\n")
batch_file_path_list.sort(key=sort_batch_file_path_by_batchid)
last_batch_metadata = batch_file_path_list[-1]
raw_line = os.popen(
"hadoop fs -cat {}|tail -n 3|awk 'NR > 1'".format(last_batch_metadata)) \
.read().strip().strip("\n")
pattern = re.compile(r'=(.*?)/')
last_batch_process_time = pattern.findall(raw_line)[0]
# 从output文件夹获取最新输出文件夹的process_time
raw_line = os.popen("hadoop fs -ls {}|tail -n 1".format(output_path)) \
.read().strip().strip("\n")
pattern = re.compile(r'=(.*?)$')
if pattern.findall(raw_line) == []:
return
last_output_process_time = pattern.findall(raw_line)[0]
print("last_batch_process_time:", last_batch_process_time)
print("last_output_process_time:", last_output_process_time)
if last_batch_process_time != last_output_process_time:
rm_path = raw_line.split(' ')[-1]
print("上批次程序异常退出, 正在清理输出文件...")
os.system("hadoop fs -rm -r {}".format(rm_path))
print("清理完成!")
RTB_SCHEMA = StructType() \
.add("date", 'string') \
.add("sku_id", 'long') \
.add("ad_spread_type", 'integer') \
.add("advertiser_type", 'integer') \
.add("retrieval_type", 'integer') \
.add("business_type", 'integer') \
.add("pos_id", 'integer') \
.add("ad_traffic_group", 'integer') \
.add("ad_traffic_type", 'integer') \
.add("campaign_type", 'integer') \
.add("mobile_type", 'integer') \
.add("ad_billing_type", 'integer') \
.add("ad_type", 'integer') \
.add("drawing_type", 'long') \
.add("automated_bidding_type", 'long') \
.add("tcpa_phase", 'integer') \
.add("impressions", 'long') \
.add("clicks", 'long') \
.add("total_cost", 'long') \
.add("real_cost", 'long') \
.add("fake_cost", 'long') \
.add("commission_cost", 'long') \
.add("dt", 'string')
TOUTIAO_SCHEMA = StructType() \
.add("loc", "string") \
.add("item_sku_id", "integer") \
.add("ad_cost_type", "string") \
.add("advertiser_type", "string") \
.add("retrieval_type", "string") \
.add("business_type", "string") \
.add("placement_id", "string") \
.add("ad_traffic_type", "string") \
.add("ad_traffic_group", "string") \
.add("campaign_type", "string") \
.add("ad_type", "string") \
.add("terminal_type", "string") \
.add("cost_type", "string") \
.add("drawing_type", "string") \
.add("native_type", "string") \
.add("automated_bidding_type", "string") \
.add("tcpa_phase", "string") \
.add("click_time", "string") \
.add("data_min", "integer") \
.add("impressions", "double") \
.add("clicks", "double") \
.add("total_price", "double") \
.add("real_price", "double") \
.add("commission_price", "double") \
.add("fake_price", "double") \
.add("fake_unperf_price", "double") \
.add("dt", "string")
CPA_SCHEMA = StructType() \
.add("time","string") \
.add("sku_id","string") \
.add("division_id_new","string") \
.add("division_name_new","string") \
.add("ad_first_dept_id","string") \
.add("ad_first_dept_name","string") \
.add("ad_second_dept_id","string") \
.add("ad_second_dept_name","string") \
.add("ad_third_dept_id","string") \
.add("ad_third_dept_name","string") \
.add("model_type","string") \
.add("resource_type","string") \
.add("ad_sales_type","string") \
.add("adv_type","string") \
.add("retrieval_type","string") \
.add("ad_business_type","string") \
.add("placement_id","string") \
.add("ad_traffic_type","string") \
.add("ad_traffic_group","string") \
.add("campaign_type","string") \
.add("item_first_cate_cd","string") \
.add("item_first_cate_name","string") \
.add("ad_sku_brand_code","string") \
.add("ad_sku_brandname_full","string") \
.add("terminal_type_cd","string") \
.add("terminal_type","string") \
.add("ad_consumption_type","string") \
.add("ad_type_cd","string") \
.add("ad_type_name","string") \
.add("drawing_type","string") \
.add("native_type","string") \
.add("media_type","string") \
.add("automated_bidding_type","string") \
.add("tcpa_phase","string") \
.add("impressions","double") \
.add("clicks","double") \
.add("consumption","double") \
.add("cash_consumption","double") \
.add("commission_consumption","double") \
.add("fake_consumption","double") \
.add("dt","date")
CPS_SCHEMA = StructType() \
.add("time","string") \
.add("sku_id","string") \
.add("division_id_new","string") \
.add("division_name_new","string") \
.add("ad_first_dept_id","string") \
.add("ad_first_dept_name","string") \
.add("ad_second_dept_id","string") \
.add("ad_second_dept_name","string") \
.add("ad_third_dept_id","string") \
.add("ad_third_dept_name","string") \
.add("model_type","string") \
.add("resource_type","string") \
.add("ad_sales_type","string") \
.add("adv_type","string") \
.add("retrieval_type","string") \
.add("ad_business_type","string") \
.add("placement_id","string") \
.add("ad_traffic_type","string") \
.add("ad_traffic_group","string") \
.add("campaign_type","string") \
.add("item_first_cate_cd","string") \
.add("item_first_cate_name","string") \
.add("ad_sku_brand_code","string") \
.add("ad_sku_brandname_full","string") \
.add("terminal_type_cd","string") \
.add("terminal_type","string") \
.add("ad_consumption_type","string") \
.add("ad_type_cd","string") \
.add("ad_type_name","string") \
.add("drawing_type","string") \
.add("native_type","string") \
.add("media_type","string") \
.add("automated_bidding_type","string") \
.add("tcpa_phase","string") \
.add("pop_or_b2c","string") \
.add("impressions","double") \
.add("clicks","double") \
.add("consumption","double") \
.add("cash_consumption","double") \
.add("commission_consumption","double") \
.add("fake_consumption","double") \
.add("gross_income","double") \
.add("cps_fee","double") \
.add("dt","integer")
CPD_IMP_CLK_SCHEMA = StructType() \
.add("date", 'string') \
.add("sku_id", 'long') \
.add("ad_spread_type", 'integer') \
.add("advertiser_type", 'integer') \
.add("retrieval_type", 'integer') \
.add("business_type", 'integer') \
.add("pos_id", 'integer') \
.add("ad_traffic_group", 'integer') \
.add("ad_traffic_type", 'integer') \
.add("campaign_type", 'integer') \
.add("mobile_type", 'integer') \
.add("ad_billing_type", 'integer') \
.add("ad_type", 'integer') \
.add("drawing_type", 'long') \
.add("automated_bidding_type", 'long') \
.add("tcpa_phase", 'integer') \
.add("material_id", 'long') \
.add("impressions", 'long') \
.add("clicks", 'long') \
.add("dt", 'long')
CPS_IMP_CLK_SCHEMA = StructType() \
.add("date", 'string') \
.add("sku_id", 'string') \
.add("ad_spread_type", 'integer') \
.add("advertiser_type", 'integer') \
.add("retrieval_type", 'integer') \
.add("business_type", 'integer') \
.add("pos_id", 'integer') \
.add("ad_traffic_group", 'integer') \
.add("ad_traffic_type", 'integer') \
.add("campaign_type", 'integer') \
.add("mobile_type", 'integer') \
.add("ad_billing_type", 'integer') \
.add("ad_type", 'integer') \
.add("drawing_type", 'integer') \
.add("automated_bidding_type", 'integer') \
.add("tcpa_phase", 'integer') \
.add("material_id", 'integer') \
.add("clicks", 'long') \
.add("dt", 'long')
rtb_common = """cast(sku_id as string),
case
when ad_spread_type = 1 then
'internal'
when ad_spread_type = 2 then
'external'
when ad_spread_type = 3 then
'repage'
else
'Unknown'
end as resource_type,
case
when ((ad_traffic_group = 2 or ad_traffic_group = 12)
and (ad_billing_type = 4 or ad_billing_type = 2)
and ad_spread_type = 2)
or ad_traffic_group = 162 then '1' --合约
else '2' --效果
end as ad_sales_type, -- 合约/效果
cast(advertiser_type as string) as adv_type,
cast(retrieval_type as string),
cast(business_type as string) as ad_business_type,
cast(pos_id as string) AS placement_id, -- 广告位id
cast(ad_traffic_group as string),
cast(ad_traffic_type as string),
cast(campaign_type as string),
case
when mobile_type = 0 then
'PC'
when mobile_type = 1 then
'M'
when mobile_type = 2 then
'APP'
when mobile_type = 3 then
'WeChat'
when mobile_type = 4 then
'MobileQQ'
when mobile_type = 5 then
'Kepler'
when mobile_type = 6 then
'ExtAPP'
else
'Unknown'
end as terminal_type, -- 设备类型
cast(mobile_type as string) as terminal_type_cd,
case
when ad_billing_type = 0 then
'CPC'
when ad_billing_type = 1 then
'CPM'
when ad_billing_type = 2 then
'CPD'
when ad_billing_type = 3 then
'CPS'
when ad_billing_type = 4 then
'CPMFIX'
else
'Unknown'
end AS ad_consumption_type, -- 计费类型
case
when ad_type = 1 then
'image'
when ad_type = 3 then
'imageText'
when ad_type = 4 then
'shop'
when ad_type = 5 then
'query'
when ad_type = 6 then
'video'
when ad_type = 8 then
'double_11'
when ad_type = 9 then
'list'
when ad_type = 10 then
'article'
when ad_type = 11 then
'productVideo'
when ad_type = 12 then
'recommend'
when ad_type = 100 then
'icon'
else
'Unknown'
end as ad_type_name,
cast(ad_type as string) as ad_type_cd,
cast(automated_bidding_type as string),
cast(tcpa_phase as string),"""
def main():
"""spark主控程序"""
if len(sys.argv) != 2:
import setting
elif sys.argv[1] == 'test':
import setting_test as setting
elif sys.argv[1] == 'history':
import setting_history as setting
else:
import setting
fix_checkpoint(setting.QUERY_MINUTE_LEVEL_RTB_TOUTIAO_SINK_PATH)
spark = SparkSession \
.builder \
.appName("RTB and Toutiao Minute Level Data Pipeline") \
.config("spark.sql.autoBroadcastJoinThreshold", "262144000") \
.config("spark.sql.files.maxPartitionBytes", "262144000") \
.config("spark.sql.streaming.schemaInference", "true") \
.config("spark.sql.shuffle.partitions", "200") \
.config(
"spark.sql.streaming.checkpointLocation", setting.MINUTE_LEVEL_BOARD_CHECKPOINT_PATH) \
.enableHiveSupport() \
.getOrCreate()
spark.sparkContext.setLogLevel("WARN")
rtb_stream_df = spark \
.readStream \
.format('parquet') \
.option("latestFirst", "true") \
.option('path', setting.RTB_DATA_BASE_PATH) \
.option('maxFilesPerTrigger',500) \
.schema(RTB_SCHEMA) \
.load()
rtb_stream_df.createOrReplaceTempView("rtb_stream_df")
rtb_formatted_stream_df = spark.sql("""
select
{rtb_common}
cast(total_cost as double) AS consumption, -- 总消耗
cast(real_cost as double) AS cash_consumption, -- 真钱扣费
cast(commission_cost as double) AS commission_consumption, -- POP返佣
cast(fake_cost as double) AS fake_consumption, -- 虚拟金消耗
cast(impressions as double),
cast(clicks as double),
-- REGEXP_REPLACE(date, '[- :]', '') AS time,
cast(from_unixtime(unix_timestamp(date,'yyyy-MM-dd HH:mm:ss'),'yyyyMMddHHmm') as bigint) as time,
'RTB' as dp
from
rtb_stream_df
""".format(rtb_common=rtb_common))
toutiao_stream_df = spark \
.readStream \
.format("orc") \
.option("path", setting.TOUTIAO_DATA_BASE_PATH) \
.option('maxFilesPerTrigger',500) \
.schema(TOUTIAO_SCHEMA) \
.load()
toutiao_stream_df.createOrReplaceTempView("toutiao_stream_df")
toutiao_formatted_stream_df = spark.sql("""
SELECT
cast(item_sku_id as string) AS sku_id,
case
when loc = '1' then
'internal'
when loc = '2' then
'external'
when loc = '3' then
'repage'
else
'Unknown'
end as resource_type,
CASE
WHEN ad_cost_type = '合约' THEN '1'
WHEN ad_cost_type = '效果' THEN '2'
ELSE '2'
END AS ad_sales_type,
advertiser_type as adv_type,
retrieval_type,
business_type as ad_business_type,
placement_id,
ad_traffic_group,
ad_traffic_type,
campaign_type,
terminal_type,
case
when terminal_type = 'PC' then '0'
when terminal_type = 'M' then '1'
when terminal_type = 'APP' then '2'
when terminal_type = 'WeChat' then '3'
when terminal_type = 'MobileQQ' then '4'
when terminal_type = 'Kepler' then '5'
when terminal_type = 'ExtAPP' then '6'
else '-999999'
end as terminal_type_cd,
cost_type as ad_consumption_type,
ad_type as ad_type_name,
case
when ad_type = 'image' then '1'
when ad_type = 'imageText' then '3'
when ad_type = 'shop' then '4'
when ad_type = 'query' then '5'
when ad_type = 'video' then '6'
when ad_type = 'double_11' then '8'
when ad_type = 'list' then '9'
when ad_type = 'article' then '10'
when ad_type = 'productVideo' then '11'
when ad_type = 'recommend' then '12'
when ad_type = 'icon' then '100'
else
'-999999'
end as ad_type_cd,
automated_bidding_type,
tcpa_phase,
total_price as consumption,
real_price as cash_consumption,
commission_price as commission_consumption,
fake_price as fake_consumption,
impressions,
clicks,
cast(from_unixtime((unix_timestamp(dt,'yyyyMMddHHmm')-600+data_min*60),'yyyyMMddHHmm') as bigint) as time,
'TouTiao' as dp
FROM
toutiao_stream_df
""")
unioned_tt_stream_df = rtb_formatted_stream_df.union(toutiao_formatted_stream_df)
cpa_stream_df = spark \
.readStream \
.format("orc") \
.option("path", setting.CPA_DATA_BASE_PATH) \
.option('maxFilesPerTrigger',500) \
.schema(CPA_SCHEMA) \
.load()
cpa_stream_df.createOrReplaceTempView("cpa_stream_df")
cpa_formatted_stream_df = spark.sql("""
select
sku_id, --NULL、3125212
resource_type,
ad_sales_type,
adv_type,
retrieval_type,
ad_business_type,
placement_id,
ad_traffic_group,
ad_traffic_type,
campaign_type,
terminal_type,
terminal_type_cd,
ad_consumption_type,
ad_type_name, --contextMarketing
ad_type_cd, --NULL
automated_bidding_type,
tcpa_phase,
consumption, --0
cash_consumption, --0
commission_consumption, --0
fake_consumption, --0
impressions, --0
clicks, --0
time_new as time,
'CPA' as dp
from
(SELECT
cast(from_unixtime(unix_timestamp(time,'yyyy-MM-dd HH:mm:ss'),'yyyyMMddHHmm') as bigint) as time_new, --2018-07-24 03:00:**、2018-07-24 03:01:**
*
FROM
cpa_stream_df
) a
""")
unioned_cpa_stream_df = unioned_tt_stream_df.union(cpa_formatted_stream_df)
cps_stream_df = spark \
.readStream \
.format("orc") \
.option("path", setting.CPS_DATA_BASE_PATH) \
.option('maxFilesPerTrigger',500) \
.schema(CPS_SCHEMA) \
.load()
cps_stream_df.createOrReplaceTempView("cps_stream_df")
cps_formatted_stream_df = spark.sql("""
SELECT
sku_id,
resource_type,--中文
ad_sales_type,
adv_type,
retrieval_type,
ad_business_type,
placement_id,
ad_traffic_group,
ad_traffic_type,
campaign_type,
terminal_type,
terminal_type_cd,
ad_consumption_type,
ad_type_name, --NULL
ad_type_cd, --NULL
automated_bidding_type, --NULL
tcpa_phase, --NULL
consumption,
cash_consumption,
commission_consumption, --0
fake_consumption, --0
impressions, --0
clicks, --0
time_new as time, --2018-07-24 03:00:**、2018-07-24 03:01:**
'CPS' as dp
from
(select
cast(from_unixtime(unix_timestamp(time,'yyyy-MM-dd HH:mm:ss'),'yyyyMMddHHmm') as bigint) as time_new, --2018-07-24 03:00:**、2018-07-24 03:01:**
*
FROM
cps_stream_df
) a
""")
unioned_cps_stream_df = unioned_cpa_stream_df.union(cps_formatted_stream_df)
cpd_imp_clk_stream_df = spark \
.readStream \
.format('parquet') \
.option("latestFirst", "true") \
.option('path', setting.CPD_IMP_CLK_DATA_BASE_PATH) \
.option('maxFilesPerTrigger',500) \
.schema(CPD_IMP_CLK_SCHEMA) \
.load()
cpd_imp_clk_stream_df.createOrReplaceTempView("cpd_imp_clk_stream_df")
cpd_imp_clk_formatted_stream_df = spark.sql("""
select
{rtb_common}
cast(0.0 as double) AS consumption, -- 总消耗
cast(0.0 as double) AS cash_consumption, -- 真钱扣费
cast(0.0 as double) AS commission_consumption, -- POP返佣
cast(0.0 as double) AS fake_consumption, -- 虚拟金消耗
cast(impressions as double),
cast(clicks as double),
cast(from_unixtime(unix_timestamp(date,'yyyy-MM-dd HH:mm:ss'),'yyyyMMddHHmm') as bigint) as time,
'CPD_imp_clk' as dp
from
cpd_imp_clk_stream_df
""".format(rtb_common=rtb_common))
unioned_cpd_imp_clk_stream_df = unioned_cps_stream_df.union(cpd_imp_clk_formatted_stream_df)
cps_imp_clk_stream_df = spark \
.readStream \
.format('parquet') \
.option("latestFirst", "true") \
.option('path', setting.CPS_IMP_CLK_DATA_BASE_PATH) \
.option('maxFilesPerTrigger',500) \
.schema(CPS_IMP_CLK_SCHEMA) \
.load()
cps_imp_clk_stream_df.createOrReplaceTempView("cps_imp_clk_stream_df")
cps_imp_clk_formatted_stream_df = spark.sql("""
select
{rtb_common}
cast(0.0 as double) AS consumption, -- 总消耗
cast(0.0 as double) AS cash_consumption, -- 真钱扣费
cast(0.0 as double) AS commission_consumption, -- POP返佣
cast(0.0 as double) AS fake_consumption, -- 虚拟金消耗
cast(0.0 as double) AS impressions,
cast(clicks as double),
cast(from_unixtime(unix_timestamp(date,'yyyy-MM-dd HH:mm:ss'),'yyyyMMddHHmm') as bigint) as time,
'CPS_imp_clk' as dp
from
cps_imp_clk_stream_df
""".format(rtb_common=rtb_common))
unioned_stream_df = unioned_cpd_imp_clk_stream_df.union(cps_imp_clk_formatted_stream_df)
# 记录上一次扫描商品表时的最新分区时间, 用于决定是否需要reload商品表
last_product_partition_date = get_last_partition_date("dim.dim_product_daily_item_sku", "active")
last_media_partition_date = get_last_partition_date("ad.fdm_dsp_media_chain")
dim_product_active_check_new_partition = partial(
check_new_partition,
"dim.dim_product_daily_item_sku",
"active")
dim_media_check_new_partition = partial(check_new_partition, "ad.fdm_dsp_media_chain", None)
while True:
reload_flag = ReloadFlag()
product_df = spark.sql(sql_get_dim_product_active_table()).cache()
media_df = spark.sql(sql_get_dim_media_table()).cache()
joined_product_stream_df = unioned_stream_df.join(product_df, "sku_id", "left_outer")
joined_media_stream_df = joined_product_stream_df.join(
media_df,
"placement_id",
"left_outer")
joined_media_stream_df.createOrReplaceTempView("joined_media_stream_df")
output_stream_df = spark.sql("""
select
resource_type,
ad_sales_type,
adv_type,
retrieval_type,
ad_business_type_new as ad_business_type,
placement_id,
ad_traffic_group,
ad_traffic_type,
campaign_type,
terminal_type,
terminal_type_cd,
ad_consumption_type,
ad_type_name,
ad_type_cd,
automated_bidding_type_new as automated_bidding_type,
tcpa_phase_new as tcpa_phase,
consumption,
cash_consumption,
commission_consumption,
fake_consumption,
impressions,
clicks,
time,
division_id_new,
division_name_new,
ad_first_dept_id,
ad_first_dept_name,
ad_second_dept_id,
ad_second_dept_name,
ad_third_dept_id,
ad_third_dept_name,
item_first_cate_cd,
item_first_cate_name,
ad_sku_brand_code,
ad_sku_brandname_full,
drawing_type,
native_type,
media_type,
CASE WHEN (type_raw IS NULL or type_raw IN ('NULL','null','','unknown')) THEN
CASE WHEN adv_type = 2 THEN 'POP'
WHEN adv_type IN (12,13) THEN 'EXTERNAL'
ELSE 'B2C' END
ELSE type_raw END
as model_type, -- 模式
account_type,
from_unixtime(unix_timestamp(),'yyyyMMddHHmm00') AS process_time, -- 统一处理时间打标
cast(from_unixtime(unix_timestamp(cast(time as string),'yyyyMMddHHmm'),'yyyy-MM-dd') as date) as dt,
dp
from
(select
case when dp = 'CPD_imp_clk' then '16'
when dp = 'CPS_imp_clk' then '32'
else ad_business_type
end as ad_business_type_new,
coalesce(automated_bidding_type,0) as automated_bidding_type_new,
coalesce(tcpa_phase,0) as tcpa_phase_new,
case when drawing_type = '1' and native_type = '1' or retrieval_type = '2' then '搜索'
when drawing_type = '1' and native_type = '2' then '推荐'
when target_group_type = '1' then '首焦'
when dp = 'GDT' then '腾讯'
when ad_traffic_type in ('133','162') or (dp = 'RTB' and resource_type in ('external','repage') and (ad_traffic_type in ('13','59','99') or placement_id in ('813','833')) and ad_business_type not in ('64','8192','32768')) then '头条'
when dp = 'RTB' and resource_type in ('external','repage') and ad_traffic_type in ('1','4','5','7','37','51','58','81','90','122','138','139') and ad_business_type not in ('64','8192','32768') then '百度'
when dp = 'RTB' and resource_type in ('external','repage') and ad_traffic_type in ('52') and ad_business_type not in ('64','8192','32768') then '讯飞'
when dp in ('CPS','CPA','CPS_imp_clk') then '联盟'
else '未知'
end as media_type,
case when adv_type in ('2','3','7','9','12','13') then 'accountCash'
else 'accountMchVirtual'
end as account_type,
*
from
joined_media_stream_df
) a
""")
query_minute_level_rtb_toutiao = output_stream_df \
.writeStream \
.trigger(processingTime='60 seconds') \
.queryName(setting.QUERY_NAME_MINUTE_LEVEL_RTB_TOUTIAO) \
.partitionBy('process_time','dt','dp') \
.outputMode("append") \
.format("parquet") \
.option("path", setting.QUERY_MINUTE_LEVEL_RTB_TOUTIAO_SINK_PATH) \
.start()
check_need_for_reload_product_thread = Thread(
target=dim_product_active_check_new_partition,
args=(last_product_partition_date, query_minute_level_rtb_toutiao, reload_flag))
check_need_for_reload_product_thread.daemon = True
check_need_for_reload_product_thread.start()
check_need_for_reload_media_thread = Thread(
target=dim_media_check_new_partition,
args=(last_media_partition_date, query_minute_level_rtb_toutiao, reload_flag))
check_need_for_reload_media_thread.daemon = True
check_need_for_reload_media_thread.start()
# 任何一个查询终止后程序立即退出, 防止出现某些查询失败但程序仍旧运行的情况
spark.streams.awaitAnyTermination()
query_minute_level_rtb_toutiao.stop()
# 必须重置状态, 否则awaitAnyTermination()不会再次生效
spark.streams.resetTerminated()
# 清除过期的static data frame缓存(非阻塞清除)
product_df.unpersist()
media_df.unpersist()
# 更新最新分区时间
last_product_partition_date = get_last_partition_date("dim.dim_product_daily_item_sku", "active")
print("reload 表dim.dim_product_daily_item_sku active分区 dt={}".format(last_product_partition_date))
last_media_partition_date = get_last_partition_date("ad.fdm_dsp_media_chain")
print("reload 表ad.fdm_dsp_media_chain分区 dt={}".format(last_media_partition_date))
if __name__ == "__main__":
main()
以上是关于python structured_streaming的主要内容,如果未能解决你的问题,请参考以下文章