sparksql 复杂查询OOM
Posted Books&Coffee
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了sparksql 复杂查询OOM相关的知识,希望对你有一定的参考价值。
平台上执行复杂查询,OOM,根据日志提示的结局方法:
-- SET spark.driver.memory=6/8G;【还是OOM】
set spark.sql.autoBroadcastJoinThreshold=-1;【解决问题】
Exception in thread "broadcast-exchange-0" java.lang.OutOfMemoryError: Not enough memory to build and broadcast the table to all worker nodes. As a workaround, you can either disable broadcast by setting spark.sql.autoBroadcastJoinThreshold to -1 or increase the spark driver memory by setting spark.driver.memory to a higher value at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anonfun$relationFuture$1$$anonfun$apply$1.apply(BroadcastExchangeExec.scala:115) at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anonfun$relationFuture$1$$anonfun$apply$1.apply(BroadcastExchangeExec.scala:73) at org.apache.spark.sql.execution.SQLExecution$.withExecutionId(SQLExecution.scala:97) at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anonfun$relationFuture$1.apply(BroadcastExchangeExec.scala:72) at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anonfun$relationFuture$1.apply(BroadcastExchangeExec.scala:72) at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) at java.lang.Thread.run(Thread.java:748)
2019-11-20 13:45:38 调度器收到作业实例: T_6466665892523663_20191120134538_2, 加入到时间调度队列中. 2019-11-20 13:45:38 计划开始时间是: 2019-11-19 00:10:00, 等待调度中... 2019-11-20 13:45:38 实例到达调度时间, 但父节点尚未完成 2019-11-20 13:47:05 实例的父节点已完成, 且达调度时间,加入调度队列. 2019-11-20 13:47:05 当前有0个作业排队中... 2019-11-20 13:47:05 等待调度中... 2019-11-20 13:47:05 开始调度作业实例 2019-11-20 13:47:05 第1次在节点: 10.94.0.15 上获取Yarn资源信息. 2019-11-20 13:47:05 Yarn:http://10.94.0.16:8088,10.94.0.10:8088, 队列:root.hive 的剩余资源是:"VCores: 17, Memory: 22796M". 2019-11-20 13:47:05 获取Yarn资源结束,开始下放到Node上执行. 2019-11-20 13:47:05 第1次调度到节点: 10.94.0.15 成功. the current free memorySize is 7240m, greater than the threshold 64m, satisfying the memory requirement to execute task 获取任务插件: SparkSql插件 成功。插件信息: 引擎:SparkSql 引擎适配版本:[] 厂商: 厂商版本: start to init task. 获取到 resource-plugin: sparkSql2.2.0-cdh5.12.0-1.0.0-jar-with-dependencies.jar 19/11/20 13:47:06 INFO spark.SparkContext: Running Spark version 2.3.0.cloudera2 19/11/20 13:47:06 INFO spark.SparkContext: Submitted application: dwd_mem_2b_applet_filter_task_detail_walker_T_6466665892523663_20191120134538_2_1 19/11/20 13:47:07 INFO server.Server: jetty-9.3.z-SNAPSHOT 19/11/20 13:47:08 INFO client.ConfiguredRMFailoverProxyProvider: Failing over to rm177 19/11/20 13:47:08 INFO yarn.Client: Requesting a new application from cluster with 5 NodeManagers 19/11/20 13:47:08 INFO yarn.Client: Verifying our application has not requested more than the maximum memory capability of the cluster (8192 MB per container) 19/11/20 13:47:08 INFO yarn.Client: Will allocate AM container, with 896 MB memory including 384 MB overhead 19/11/20 13:47:08 INFO yarn.Client: Setting up container launch context for our AM 19/11/20 13:47:08 INFO yarn.Client: Setting up the launch environment for our AM container 19/11/20 13:47:08 INFO yarn.Client: Preparing resources for our AM container 19/11/20 13:47:09 INFO yarn.Client: Submitting application application_1573451947838_0670 to ResourceManager 19/11/20 13:47:09 INFO impl.YarnClientImpl: Submitted application application_1573451947838_0670 19/11/20 13:47:10 INFO yarn.Client: Application report for application_1573451947838_0670 (state: ACCEPTED) 19/11/20 13:47:10 INFO yarn.Client: client token: N/A diagnostics: N/A ApplicationMaster host: N/A ApplicationMaster RPC port: -1 queue: root.hive start time: 1574228829689 final status: UNDEFINED tracking URL: http://cdh-hadoop-16:8088/proxy/application_1573451947838_0670/ user: deploy 19/11/20 13:47:11 INFO yarn.Client: Application report for application_1573451947838_0670 (state: ACCEPTED) 19/11/20 13:47:12 INFO yarn.Client: Application report for application_1573451947838_0670 (state: ACCEPTED) 19/11/20 13:47:13 INFO yarn.Client: Application report for application_1573451947838_0670 (state: RUNNING) 19/11/20 13:47:13 INFO yarn.Client: client token: N/A diagnostics: N/A ApplicationMaster host: 10.94.0.16 ApplicationMaster RPC port: 0 queue: root.hive start time: 1574228829689 final status: UNDEFINED tracking URL: http://cdh-hadoop-16:8088/proxy/application_1573451947838_0670/ user: deploy 19/11/20 13:47:13 INFO storage.BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy 19/11/20 13:47:13 INFO storage.BlockManager: external shuffle service port = 7337 19/11/20 13:47:13 INFO storage.BlockManager: Initialized BlockManager: BlockManagerId(driver, shuqi-node, 36374, None) 19/11/20 13:47:14 INFO scheduler.EventLoggingListener: Logging events to hdfs://hahdfs/user/spark/spark2ApplicationHistory/application_1573451947838_0670 19/11/20 13:47:14 INFO spark.SparkContext: Registered listener com.cloudera.spark.lineage.NavigatorAppListener 19/11/20 13:47:14 INFO internal.SharedState: loading hive config file: file:/etc/hadoop/conf.cloudera.yarn/hive-site.xml 19/11/20 13:47:14 INFO internal.SharedState: spark.sql.warehouse.dir is not set, but hive.metastore.warehouse.dir is set. Setting spark.sql.warehouse.dir to the value of hive.metastore.warehouse.dir (‘/user/hive/warehouse‘). 19/11/20 13:47:14 INFO internal.SharedState: Warehouse path is ‘/user/hive/warehouse‘. 19/11/20 13:47:14 INFO state.StateStoreCoordinatorRef: Registered StateStoreCoordinator endpoint 19/11/20 13:47:15 INFO hive.metastore: Trying to connect to metastore with URI thrift://cdh-hadoop-16:9083 19/11/20 13:47:15 INFO hive.metastore: Opened a connection to metastore, current connections: 1 19/11/20 13:47:15 INFO hive.metastore: Connected to metastore. 19/11/20 13:47:15 WARN command.SetCommand: ‘SET hive.exec.dynamic.partition.mode=nonstrict‘ might not work, since Spark doesn‘t support changing the Hive config dynamically. Please pass the Hive-specific config by adding the prefix spark.hadoop (e.g. spark.hadoop.hive.exec.dynamic.partition.mode) when starting a Spark application. For details, see the link: https://spark.apache.org/docs/latest/configuration.html#dynamically-loading-spark-properties. 开始执行: insert overwrite table dwd_mem_2b_applet_filter_task_detail select t1.ID as member_id, t2.LeaguerID as leaguer_id, t9.member_employee_id as member_employee_id, t2.ShopID as member_store_id, t1.Avatar as member_avatar, t1.RealName as member_name, t1.Phone as member_phone, t1.Birthday as member_birthday, t12.outbound_employee_id as outbound_employee_id, t12.outbound_member_phone as outbound_member_phone, t12.outbound_time as outbound_time, t4.baby_id as baby_id, t4.baby_avatar as baby_avatar, t4.baby_name as baby_name, t4.baby_gender as baby_gender, t4.baby_age as baby_age, t4.baby_birthday as baby_birthday, t1.register_2_nowadays as register_2_nowadays, t10.register_channel as register_channel, case when t3.first_card_open_date is not null then 1 else 0 end as is_open_cards, t3.first_card_open_date as first_card_open_date, t2.LeaguerCode as card_id, t15.consume_frequency_dateCard as consume_frequency_dateCard, t16.consume_frequency_no_dateCard as consume_frequency_no_dateCard, t5.card_expire_date as card_expire_date, t6.total_recharge as total_recharge, t7.total_consume_coin as total_consume_coin, t8.last_consume_date as last_consume_date, t8.last_consume_2_nowadays as last_consume_2_nowadays, t5.card_times_left as card_times_left, t2.card_coin_left as card_coin_left, t5.card_days_left as card_days_left, t11.card_coupon_days_left as card_coupon_days_left, t6.last_recharge_2_nowadays as last_recharge_2_nowadays, t2.card_coin_left/t13.card_coin_left as left_coin_ratio, t5.card_times_left/t14.card_times_left as left_times_ratio from ( select ID, Avatar, RealName, Phone, substr(to_date(Birthday),-5,5) as Birthday, datediff(CURRENT_DATE,to_date(CreateTime)) as register_2_nowadays -- 注册距今天数 from ods_cuihua_dqmly_dy_sp_leaguerBase where ds= ‘20191119‘ ) t1 left join ( select p1.ID as LeaguerID, -- 门店会员编码(系统自动生成 ID) p1.LeaguerBaseId, -- 会员编码 p1.ShopID, p3.LeaguerCode, -- 会员卡号 p2.card_coin_left -- 币剩余数量 from ods_cuihua_dqmly_dy_sp_Leaguer p1 left join ( select ID as LeaguerId_p2, nvl(SUM(CoinBal1) + SUM(GiveCoinBal1)+ SUM(GiveBCoinBal1),0) as card_coin_left -- 币剩余数量 from ods_cuihua_dqmly_dy_sp_Leaguer where ds= ‘20191119‘ group by ID ) p2 on p1.ID = p2.LeaguerId_p2 left join ( select LeaguerId as LeaguerId_p3, LeaguerCode from ods_cuihua_dqmly_dy_sp_LeaguerCard where ds= ‘20191119‘ ) p3 on p1.ID = p3.LeaguerId_p3 where p1.ds= ‘20191119‘ ) t2 on t1.ID = t2.LeaguerBaseId left join ( select p1.LeaguerBaseId as LeaguerBaseId_t3, min(p2.CreateTime) as first_card_open_date -- 首次开卡时间 from ods_cuihua_dqmly_dy_sp_Leaguer p1 left join ( select LeaguerId, LeaguerCode, CreateTime from ods_cuihua_dqmly_dy_sp_LeaguerCard where ds= ‘20191119‘ ) p2 on p1.ID = p2.LeaguerId where p1.ds= ‘20191119‘ group by p1.LeaguerBaseId ) t3 on t1.ID = t3.LeaguerBaseId_t3 left join ( select ID as baby_id, LeaguerBaseId as LeaguerBaseId_t4, Avatar as baby_avatar, RealName as baby_name, Sex as baby_gender, cast(datediff(CURRENT_DATE,to_date(Birthday))/365 as decimal(10,1)) as baby_age, substr(to_date(Birthday),-5,5) as baby_birthday from ods_cuihua_dqmly_dy_sp_leaguerbaby where ds= ‘20191119‘ and ID is not null and LeaguerBaseId is not null ) t4 on t1.ID = t4.LeaguerBaseId_t4 left join ( select tc.LeaguerID as LeaguerID_t5, tc.LeaguerCode as LeaguerCode_t5, te.card_expire_time as card_expire_date, -- 卡到期日期 nvl(tf.card_times_left,0) as card_times_left, -- 次卡剩余次数 nvl(datediff(to_date(te.card_expire_time),CURRENT_DATE),0) as card_days_left -- 计时卡剩余天数 from ( select LeaguerId, LeaguerCode, ID as LeaguerCardId from ods_cuihua_dqmly_dy_sp_LeaguerCard where ds= ‘20191119‘ ) tc left join ( select LeaguerID as LeaguerID_te, LeaguerCardId as LeaguerCardId_te, case when ReExpireTime is null then EndTime when EndTime <= ReExpireTime then ReExpireTime end as card_expire_time from ods_cuihua_dqmly_dy_sp_LeaguerTimeCard where ds= ‘20191119‘ and LeaguerCardId is not null ) te on tc.LeaguerId = te.LeaguerID_te and tc.LeaguerCardId = te.LeaguerCardId_te inner join ( select LeaguerID, SUM(EnabledNum) as card_times_left from ods_cuihua_dqmly_dy_sp_LeaguerTimeCard where ds= ‘20191119‘ and (CURRENT_TIMESTAMP < EndTime or CURRENT_TIMESTAMP < ReExpireTime ) and EnabledNum > 0 group by LeaguerID ) tf on te.LeaguerID_te = tf.LeaguerID ) t5 on t2.LeaguerID = t5.LeaguerID_t5 and t2.LeaguerCode = t5.LeaguerCode_t5 left join ( select LeaguerID as LeaguerID_t6, sum(money/100) as total_recharge, -- 总充值金额 datediff(CURRENT_DATE,to_date(max(createtime))) as last_recharge_2_nowadays -- 最后充值时间距今天数 from ods_cuihua_dqmly_dy_sp_setMealSellLog_delta where LeaguerID is not null group by LeaguerID ) t6 on t2.LeaguerID = t6.LeaguerID_t6 left join ( select p.LeaguerID as LeaguerID_t7, nvl(sum(p.total_consume_coin),0) as total_consume_coin from ( select LeaguerID, sum(coinnum) as total_consume_coin -- 总耗币数:消费币,兑换礼品,自取币,币换年票,赠币过期,币过户等 from ods_cuihua_dqmly_dy_sp_playLog_delta group by LeaguerID union all select LeaguerID, sum(SurplusCoin1) as total_consume_coin from ods_cuihua_dqmly_dy_sp_LeaguerGiveCoinLog_delta where IsExpire=‘true‘ and SurplusCoin1>0 -- (IsExpire:是否过期、SurplusCoin1:过期的币数) group by LeaguerID union all select LeaguerID, sum(UsedValue) as total_consume_coin from ods_cuihua_dqmly_dy_sp_giftSellLog_delta where UseValueType=5 -- (UseValueType=5表示币兑换礼品 UsedValue:消耗币数) group by LeaguerID union all select LeaguerID, sum(abs(CoinNumber1)) as total_consume_coin from ods_cuihua_dqmly_dy_sp_setMealSellLog_delta where PayType=7 -- (PayType=7 表示通过币购买套餐) group by LeaguerID union all select FromLeaguer as LeaguerID, sum(Amount) as total_consume_coin from ods_cuihua_dqmly_dy_sp_valueTransferLog_delta -- (FromLeaguer:过户源用户 ToLeaguer:过户目标用户) where ValueType = 1 group by FromLeaguer union all select LeaguerID, sum(RealCoinNum) as total_consume_coin from ods_cuihua_dqmly_dy_sp_getCoin -- (RealCoinNum:实际提币数量) where ds= ‘20191119‘ group by LeaguerID ) p where p.LeaguerID is not null group by p.LeaguerID ) t7 on t2.LeaguerID = t7.LeaguerID_t7 left join ( select LeaguerID as LeaguerID_t8, max(CreateTime) as last_consume_date, -- 最后消费日期 datediff(CURRENT_DATE,to_date(max(CreateTime))) as last_consume_2_nowadays -- 最后一次消费距今天数:取 耗币、刷计时卡、耗次、核券的最后一次时间 from ods_cuihua_dqmly_dy_sp_playLog_delta where LeaguerID is not null group by LeaguerID ) t8 on t2.LeaguerID = t8.LeaguerID_t8 left join ( select p1.invite_member_no as LeaguerBaseId_t9, p2.employee_id as member_employee_id from ( select invite_code, invite_member_no from ods_mengguang_xc_crm_member_db1_member_invite_summary where ds= ‘20191119‘ and invite_code is not null and invite_member_no is not null ) p1 inner join ( select invite_code as invite_code_p2, employee_id from ods_mengguang_xc_activity_manage_db_employee_invite_code where ds= ‘20191119‘ ) p2 on p1.invite_code = p2.invite_code_p2 ) t9 on t1.ID = t9.LeaguerBaseId_t9 left join ( select channel_info as register_channel, member_no as LeaguerBaseId_t10 from ods_mengguang_xc_crm_member_db1_member_profile where ds= ‘20191119‘ and member_no is not null ) t10 on t1.ID = t10.LeaguerBaseId_t10 left join ( select p1.LeaguerBaseId as LeaguerBaseId_t11, case when p2.draw_limit_day <= 0 then datediff(to_date(p2.end_date),CURRENT_DATE) else datediff(date_add(to_date(p1.date_created), p2.draw_limit_day),CURRENT_DATE) end as card_coupon_days_left from ( select coupon_id, member_id as LeaguerBaseId, date_created from ods_qunshuo_amazingland_campaign_coupon_code_event where ds= ‘20191119‘ and is_delete = ‘NO‘ and coupon_status in(‘WAIT_RECEIVE‘,‘WAIT_USE‘) )p1 inner join ( select coupon_number, end_date, draw_limit_day from ods_qunshuo_amazingland_campaign_coupon where ds= ‘20191119‘ and is_delete = ‘NO‘ and coupon_number <> ‘‘ ) p2 on p1.coupon_id = p2.coupon_number ) t11 on t1.ID = t11.LeaguerBaseId_t11 left join ( select tb1.custNum as outbound_member_id, -- 外呼会员编码,uuid tb1.staffNum as outbound_employee_id, -- 外呼员工id,不一定是会员所属员工id,有可能是别的员工外呼 tb1.custPhone as outbound_member_phone, -- 外呼会员手机号码 tb2.outbound_time from ods_lixiang_immccdb_callout_info tb1 inner join ( select custNum as custNum_tb2, -- 会员编码 max(createTime) as outbound_time -- 最后一次外呼时间 from ods_lixiang_immccdb_callout_info where ds= ‘20191119‘ and call_status = 0 -- 呼叫成功 【理想插入数据之后,需要打开这个限制条件】 group by custNum ) tb2 on tb1.custNum = tb2.custNum_tb2 where tb1.ds= ‘20191119‘ ) t12 on t1.ID = t12.outbound_member_id left join ( select LeaguerID as LeaguerID_t13, card_coin_left from dwd_card_coin_left_last_recharge group by LeaguerID,card_coin_left ) t13 on t2.LeaguerID = t13.LeaguerID_t13 left join ( select LeaguerID as LeaguerID_14, card_times_left from dwd_card_times_left_last_recharge group by LeaguerID,card_times_left ) t14 on t2.LeaguerID = t14.LeaguerID_14 left join ( SELECT ta.LeaguerID as LeaguerID_15, count(to_date(ta.createTime)) as consume_frequency_dateCard FROM ods_cuihua_dqmly_dy_sp_playLog_delta AS ta INNER JOIN ods_cuihua_dqmly_dy_sp_timeCard AS tb ON ta.TimeCardId = tb.ID where tb.ds= ‘20191119‘ and tb.Category IN (2,3) -- 保证是计时卡会员 and to_date(ta.createtime) >= date_sub(CURRENT_DATE,180) and to_date(ta.createtime) <= CURRENT_DATE group by ta.LeaguerID )t15 on t2.LeaguerID = t15.LeaguerID_15 left join ( SELECT ta.LeaguerID as LeaguerID_16, count(to_date(ta.createTime)) as consume_frequency_no_dateCard FROM ods_cuihua_dqmly_dy_sp_playLog_delta AS ta INNER JOIN ods_cuihua_dqmly_dy_sp_timeCard AS tb ON ta.TimeCardId = tb.ID where tb.ds= ‘20191119‘ and tb.Category NOT IN (2,3) -- 保证是非计时卡会员 and to_date(ta.createtime) >= date_sub(CURRENT_DATE,180) and to_date(ta.createtime) <= CURRENT_DATE group by ta.LeaguerID )t16 on t2.LeaguerID = t16.LeaguerID_16; 19/11/20 13:47:17 INFO hive.HiveMetastoreCatalog: Inferring case-sensitive schema for table xcdqm_prd.ods_cuihua_dqmly_dy_sp_leaguerbase (inference mode: INFER_AND_SAVE) 19/11/20 13:47:17 INFO scheduler.DAGScheduler: Parents of final stage: List() 19/11/20 13:47:17 INFO scheduler.DAGScheduler: Missing parents: List() 19/11/20 13:47:17 INFO spark.SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:1039 19/11/20 13:47:17 INFO cluster.YarnScheduler: Adding task set 0.0 with 1 tasks 19/11/20 13:47:18 INFO spark.ExecutorAllocationManager: Requesting 1 new executor because tasks are backlogged (new desired total will be 1) Exception in thread "broadcast-exchange-0" java.lang.OutOfMemoryError: Not enough memory to build and broadcast the table to all worker nodes. As a workaround, you can either disable broadcast by setting spark.sql.autoBroadcastJoinThreshold to -1 or increase the spark driver memory by setting spark.driver.memory to a higher value at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anonfun$relationFuture$1$$anonfun$apply$1.apply(BroadcastExchangeExec.scala:115) at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anonfun$relationFuture$1$$anonfun$apply$1.apply(BroadcastExchangeExec.scala:73) at org.apache.spark.sql.execution.SQLExecution$.withExecutionId(SQLExecution.scala:97) at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anonfun$relationFuture$1.apply(BroadcastExchangeExec.scala:72) at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anonfun$relationFuture$1.apply(BroadcastExchangeExec.scala:72) at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) at java.lang.Thread.run(Thread.java:748)
以上是关于sparksql 复杂查询OOM的主要内容,如果未能解决你的问题,请参考以下文章