sparksql 复杂查询OOM

Posted Books&Coffee

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了sparksql 复杂查询OOM相关的知识,希望对你有一定的参考价值。

平台上执行复杂查询,OOM,根据日志提示的结局方法:

-- SET spark.driver.memory=6/8G;【还是OOM】
set spark.sql.autoBroadcastJoinThreshold=-1;【解决问题】

 

Exception in thread "broadcast-exchange-0" java.lang.OutOfMemoryError: Not enough memory to build and broadcast the table to all worker nodes. As a workaround, you can either disable broadcast by setting spark.sql.autoBroadcastJoinThreshold to -1 or increase the spark driver memory by setting spark.driver.memory to a higher value
	at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anonfun$relationFuture$1$$anonfun$apply$1.apply(BroadcastExchangeExec.scala:115)
	at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anonfun$relationFuture$1$$anonfun$apply$1.apply(BroadcastExchangeExec.scala:73)
	at org.apache.spark.sql.execution.SQLExecution$.withExecutionId(SQLExecution.scala:97)
	at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anonfun$relationFuture$1.apply(BroadcastExchangeExec.scala:72)
	at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anonfun$relationFuture$1.apply(BroadcastExchangeExec.scala:72)
	at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
	at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
	at java.lang.Thread.run(Thread.java:748)

 

2019-11-20 13:45:38 调度器收到作业实例: T_6466665892523663_20191120134538_2, 加入到时间调度队列中. 
2019-11-20 13:45:38 计划开始时间是: 2019-11-19 00:10:00, 等待调度中...
2019-11-20 13:45:38 实例到达调度时间, 但父节点尚未完成
2019-11-20 13:47:05 实例的父节点已完成, 且达调度时间,加入调度队列.
2019-11-20 13:47:05 当前有0个作业排队中...
2019-11-20 13:47:05 等待调度中...
2019-11-20 13:47:05 开始调度作业实例
2019-11-20 13:47:05 第1次在节点: 10.94.0.15 上获取Yarn资源信息.
2019-11-20 13:47:05 Yarn:http://10.94.0.16:8088,10.94.0.10:8088, 队列:root.hive 的剩余资源是:"VCores: 17, Memory: 22796M".
2019-11-20 13:47:05 获取Yarn资源结束,开始下放到Node上执行.
2019-11-20 13:47:05 第1次调度到节点: 10.94.0.15 成功.

the current free memorySize is 7240m, greater than the threshold 64m, satisfying the memory requirement to execute task
获取任务插件: SparkSql插件 成功。插件信息:
          引擎:SparkSql
          引擎适配版本:[]
          厂商:
          厂商版本:
start to init task.

获取到 resource-plugin: sparkSql2.2.0-cdh5.12.0-1.0.0-jar-with-dependencies.jar
19/11/20 13:47:06 INFO spark.SparkContext: Running Spark version 2.3.0.cloudera2
19/11/20 13:47:06 INFO spark.SparkContext: Submitted application: dwd_mem_2b_applet_filter_task_detail_walker_T_6466665892523663_20191120134538_2_1
19/11/20 13:47:07 INFO server.Server: jetty-9.3.z-SNAPSHOT
19/11/20 13:47:08 INFO client.ConfiguredRMFailoverProxyProvider: Failing over to rm177
19/11/20 13:47:08 INFO yarn.Client: Requesting a new application from cluster with 5 NodeManagers
19/11/20 13:47:08 INFO yarn.Client: Verifying our application has not requested more than the maximum memory capability of the cluster (8192 MB per container)
19/11/20 13:47:08 INFO yarn.Client: Will allocate AM container, with 896 MB memory including 384 MB overhead
19/11/20 13:47:08 INFO yarn.Client: Setting up container launch context for our AM
19/11/20 13:47:08 INFO yarn.Client: Setting up the launch environment for our AM container
19/11/20 13:47:08 INFO yarn.Client: Preparing resources for our AM container
19/11/20 13:47:09 INFO yarn.Client: Submitting application application_1573451947838_0670 to ResourceManager
19/11/20 13:47:09 INFO impl.YarnClientImpl: Submitted application application_1573451947838_0670
19/11/20 13:47:10 INFO yarn.Client: Application report for application_1573451947838_0670 (state: ACCEPTED)
19/11/20 13:47:10 INFO yarn.Client: 
	 client token: N/A
	 diagnostics: N/A
	 ApplicationMaster host: N/A
	 ApplicationMaster RPC port: -1
	 queue: root.hive
	 start time: 1574228829689
	 final status: UNDEFINED
	 tracking URL: http://cdh-hadoop-16:8088/proxy/application_1573451947838_0670/
	 user: deploy
19/11/20 13:47:11 INFO yarn.Client: Application report for application_1573451947838_0670 (state: ACCEPTED)
19/11/20 13:47:12 INFO yarn.Client: Application report for application_1573451947838_0670 (state: ACCEPTED)
19/11/20 13:47:13 INFO yarn.Client: Application report for application_1573451947838_0670 (state: RUNNING)
19/11/20 13:47:13 INFO yarn.Client: 
	 client token: N/A
	 diagnostics: N/A
	 ApplicationMaster host: 10.94.0.16
	 ApplicationMaster RPC port: 0
	 queue: root.hive
	 start time: 1574228829689
	 final status: UNDEFINED
	 tracking URL: http://cdh-hadoop-16:8088/proxy/application_1573451947838_0670/
	 user: deploy
19/11/20 13:47:13 INFO storage.BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
19/11/20 13:47:13 INFO storage.BlockManager: external shuffle service port = 7337
19/11/20 13:47:13 INFO storage.BlockManager: Initialized BlockManager: BlockManagerId(driver, shuqi-node, 36374, None)
19/11/20 13:47:14 INFO scheduler.EventLoggingListener: Logging events to hdfs://hahdfs/user/spark/spark2ApplicationHistory/application_1573451947838_0670
19/11/20 13:47:14 INFO spark.SparkContext: Registered listener com.cloudera.spark.lineage.NavigatorAppListener
19/11/20 13:47:14 INFO internal.SharedState: loading hive config file: file:/etc/hadoop/conf.cloudera.yarn/hive-site.xml
19/11/20 13:47:14 INFO internal.SharedState: spark.sql.warehouse.dir is not set, but hive.metastore.warehouse.dir is set. Setting spark.sql.warehouse.dir to the value of hive.metastore.warehouse.dir (‘/user/hive/warehouse‘).
19/11/20 13:47:14 INFO internal.SharedState: Warehouse path is ‘/user/hive/warehouse‘.
19/11/20 13:47:14 INFO state.StateStoreCoordinatorRef: Registered StateStoreCoordinator endpoint
19/11/20 13:47:15 INFO hive.metastore: Trying to connect to metastore with URI thrift://cdh-hadoop-16:9083
19/11/20 13:47:15 INFO hive.metastore: Opened a connection to metastore, current connections: 1
19/11/20 13:47:15 INFO hive.metastore: Connected to metastore.
19/11/20 13:47:15 WARN command.SetCommand: ‘SET hive.exec.dynamic.partition.mode=nonstrict‘ might not work, since Spark doesn‘t support changing the Hive config dynamically. Please pass the Hive-specific config by adding the prefix spark.hadoop (e.g. spark.hadoop.hive.exec.dynamic.partition.mode) when starting a Spark application. For details, see the link: https://spark.apache.org/docs/latest/configuration.html#dynamically-loading-spark-properties.

开始执行: 
insert overwrite table dwd_mem_2b_applet_filter_task_detail
select t1.ID as member_id,
       t2.LeaguerID as leaguer_id,
       t9.member_employee_id as member_employee_id, 
       t2.ShopID as member_store_id, 
       t1.Avatar as member_avatar, 
       t1.RealName as member_name, 
       t1.Phone as member_phone, 
       t1.Birthday as member_birthday, 
       t12.outbound_employee_id as outbound_employee_id,
       t12.outbound_member_phone as outbound_member_phone, 
       t12.outbound_time as outbound_time, 
       t4.baby_id as baby_id, 
       t4.baby_avatar as baby_avatar, 
       t4.baby_name as baby_name, 
       t4.baby_gender as baby_gender, 
       t4.baby_age as baby_age, 
       t4.baby_birthday as baby_birthday, 
       t1.register_2_nowadays as register_2_nowadays, 
       t10.register_channel as register_channel, 
       case when t3.first_card_open_date is not null then 1 
            else 0 end as is_open_cards,
       t3.first_card_open_date as first_card_open_date,
       t2.LeaguerCode as card_id,
       t15.consume_frequency_dateCard as consume_frequency_dateCard,
       t16.consume_frequency_no_dateCard as consume_frequency_no_dateCard,
       t5.card_expire_date as card_expire_date,
       t6.total_recharge as total_recharge,
       t7.total_consume_coin as total_consume_coin,
       t8.last_consume_date as last_consume_date,
       t8.last_consume_2_nowadays as last_consume_2_nowadays,
       t5.card_times_left as card_times_left,
       t2.card_coin_left as card_coin_left,
       t5.card_days_left as card_days_left,
       t11.card_coupon_days_left as card_coupon_days_left,
       t6.last_recharge_2_nowadays as last_recharge_2_nowadays,
       t2.card_coin_left/t13.card_coin_left as left_coin_ratio,
       t5.card_times_left/t14.card_times_left as left_times_ratio
from 
(
    select ID, 
           Avatar, 
           RealName, 
           Phone, 
           substr(to_date(Birthday),-5,5) as Birthday,
           datediff(CURRENT_DATE,to_date(CreateTime)) as register_2_nowadays  -- 注册距今天数
    from ods_cuihua_dqmly_dy_sp_leaguerBase
    where ds= ‘20191119‘
) t1 
left join 
(
    select p1.ID as LeaguerID,       -- 门店会员编码(系统自动生成 ID)
           p1.LeaguerBaseId,         -- 会员编码
           p1.ShopID,
           p3.LeaguerCode,           -- 会员卡号
           p2.card_coin_left         -- 币剩余数量
    from ods_cuihua_dqmly_dy_sp_Leaguer p1
    left join 
    (
          select ID as LeaguerId_p2, 
                 nvl(SUM(CoinBal1) + SUM(GiveCoinBal1)+ SUM(GiveBCoinBal1),0) as card_coin_left -- 币剩余数量 
          from ods_cuihua_dqmly_dy_sp_Leaguer
          where ds= ‘20191119‘
          group by ID
    ) p2
    on p1.ID = p2.LeaguerId_p2
    left join
    (
          select LeaguerId as LeaguerId_p3,
                 LeaguerCode
          from ods_cuihua_dqmly_dy_sp_LeaguerCard
          where ds= ‘20191119‘
    ) p3
    on  p1.ID = p3.LeaguerId_p3
   where p1.ds= ‘20191119‘
) t2 
on t1.ID = t2.LeaguerBaseId 
left join 
(
    select p1.LeaguerBaseId as LeaguerBaseId_t3,
           min(p2.CreateTime) as first_card_open_date  -- 首次开卡时间
    from ods_cuihua_dqmly_dy_sp_Leaguer p1
    left join 
    (
          select LeaguerId,
                 LeaguerCode,
                 CreateTime 
          from ods_cuihua_dqmly_dy_sp_LeaguerCard
          where ds= ‘20191119‘
    ) p2
    on p1.ID = p2.LeaguerId
    where p1.ds= ‘20191119‘
    group by p1.LeaguerBaseId
) t3 
on t1.ID = t3.LeaguerBaseId_t3 
left join
(
    select ID as baby_id,
           LeaguerBaseId as LeaguerBaseId_t4,
           Avatar as baby_avatar, 
           RealName as baby_name, 
           Sex as baby_gender, 
           cast(datediff(CURRENT_DATE,to_date(Birthday))/365 as decimal(10,1)) as baby_age,
           substr(to_date(Birthday),-5,5) as baby_birthday
    from ods_cuihua_dqmly_dy_sp_leaguerbaby
    where ds= ‘20191119‘
    and ID is not null
    and LeaguerBaseId is not null
) t4
on t1.ID = t4.LeaguerBaseId_t4
left join 
(
    select  tc.LeaguerID as LeaguerID_t5,
            tc.LeaguerCode as LeaguerCode_t5,
            te.card_expire_time as card_expire_date,                                      -- 卡到期日期
            nvl(tf.card_times_left,0) as card_times_left,                                 -- 次卡剩余次数
            nvl(datediff(to_date(te.card_expire_time),CURRENT_DATE),0) as card_days_left  -- 计时卡剩余天数
    from 
      (
          select LeaguerId,
                 LeaguerCode,
                 ID as LeaguerCardId
          from ods_cuihua_dqmly_dy_sp_LeaguerCard
          where ds= ‘20191119‘
      ) tc
      left join 
      (
            select LeaguerID as LeaguerID_te,
                   LeaguerCardId as LeaguerCardId_te,
                   case when ReExpireTime is null then EndTime
                        when EndTime <= ReExpireTime then ReExpireTime
                   end as card_expire_time 
            from ods_cuihua_dqmly_dy_sp_LeaguerTimeCard
            where ds= ‘20191119‘
            and LeaguerCardId is not null
      ) te 
      on tc.LeaguerId = te.LeaguerID_te and tc.LeaguerCardId = te.LeaguerCardId_te
      inner join 
      (
            select LeaguerID, 
                   SUM(EnabledNum) as card_times_left 
            from ods_cuihua_dqmly_dy_sp_LeaguerTimeCard 
            where ds= ‘20191119‘
            and (CURRENT_TIMESTAMP < EndTime or CURRENT_TIMESTAMP < ReExpireTime ) 
            and EnabledNum > 0 
            group by LeaguerID
      ) tf 
      on te.LeaguerID_te = tf.LeaguerID
) t5 
on t2.LeaguerID = t5.LeaguerID_t5 and t2.LeaguerCode = t5.LeaguerCode_t5
left join 
(
    select LeaguerID as LeaguerID_t6, 
           sum(money/100) as total_recharge,   -- 总充值金额
           datediff(CURRENT_DATE,to_date(max(createtime))) as last_recharge_2_nowadays   -- 最后充值时间距今天数
    from ods_cuihua_dqmly_dy_sp_setMealSellLog_delta
    where LeaguerID is not null
    group by LeaguerID
) t6 
on t2.LeaguerID = t6.LeaguerID_t6
left join 
(
  select p.LeaguerID as LeaguerID_t7, 
         nvl(sum(p.total_consume_coin),0) as total_consume_coin
  from
  (
    select LeaguerID, 
           sum(coinnum) as total_consume_coin -- 总耗币数:消费币,兑换礼品,自取币,币换年票,赠币过期,币过户等
    from ods_cuihua_dqmly_dy_sp_playLog_delta
    group by LeaguerID
    union all 
    select LeaguerID, 
           sum(SurplusCoin1) as total_consume_coin 
    from ods_cuihua_dqmly_dy_sp_LeaguerGiveCoinLog_delta 
    where IsExpire=‘true‘ and SurplusCoin1>0  -- (IsExpire:是否过期、SurplusCoin1:过期的币数)
    group by LeaguerID
    union all 
    select LeaguerID, 
           sum(UsedValue) as total_consume_coin  
    from ods_cuihua_dqmly_dy_sp_giftSellLog_delta
    where UseValueType=5  -- (UseValueType=5表示币兑换礼品  UsedValue:消耗币数)
    group by LeaguerID
    union all 
    select LeaguerID, 
           sum(abs(CoinNumber1)) as total_consume_coin  
    from ods_cuihua_dqmly_dy_sp_setMealSellLog_delta
    where PayType=7 -- (PayType=7 表示通过币购买套餐)
    group by LeaguerID
    union all  
    select FromLeaguer as LeaguerID, 
           sum(Amount) as total_consume_coin  
    from ods_cuihua_dqmly_dy_sp_valueTransferLog_delta -- (FromLeaguer:过户源用户 ToLeaguer:过户目标用户)
    where ValueType = 1
    group by FromLeaguer
    union all
    select LeaguerID, 
           sum(RealCoinNum) as total_consume_coin  
    from ods_cuihua_dqmly_dy_sp_getCoin -- (RealCoinNum:实际提币数量)
    where ds= ‘20191119‘
    group by LeaguerID
  ) p
  where p.LeaguerID is not null
  group by p.LeaguerID
) t7 
on t2.LeaguerID = t7.LeaguerID_t7 
left join 
(
    select LeaguerID as LeaguerID_t8, 
           max(CreateTime) as last_consume_date,   -- 最后消费日期
           datediff(CURRENT_DATE,to_date(max(CreateTime))) as last_consume_2_nowadays   -- 最后一次消费距今天数:取 耗币、刷计时卡、耗次、核券的最后一次时间
    from ods_cuihua_dqmly_dy_sp_playLog_delta
    where LeaguerID is not null
    group by LeaguerID
) t8 
on t2.LeaguerID = t8.LeaguerID_t8
left join
(
    select p1.invite_member_no as LeaguerBaseId_t9,
           p2.employee_id as member_employee_id
    from 
    (
        select invite_code,
               invite_member_no
        from ods_mengguang_xc_crm_member_db1_member_invite_summary 
        where ds= ‘20191119‘
        and invite_code is not null
        and invite_member_no is not null
    ) p1
    inner join 
    (
        select invite_code as invite_code_p2,
               employee_id
        from ods_mengguang_xc_activity_manage_db_employee_invite_code
        where ds= ‘20191119‘
    ) p2
    on p1.invite_code = p2.invite_code_p2
) t9
on t1.ID = t9.LeaguerBaseId_t9
left join 
(
    select channel_info as register_channel,
           member_no as LeaguerBaseId_t10
    from ods_mengguang_xc_crm_member_db1_member_profile
    where ds= ‘20191119‘
    and member_no is not null
) t10
on t1.ID = t10.LeaguerBaseId_t10
left join
(
    select p1.LeaguerBaseId as LeaguerBaseId_t11,
           case when p2.draw_limit_day <= 0 then datediff(to_date(p2.end_date),CURRENT_DATE)
           else datediff(date_add(to_date(p1.date_created), p2.draw_limit_day),CURRENT_DATE) end as card_coupon_days_left
    from 
    (
        select coupon_id,
               member_id as LeaguerBaseId,
               date_created
        from ods_qunshuo_amazingland_campaign_coupon_code_event
        where ds= ‘20191119‘
        and is_delete = ‘NO‘
        and coupon_status in(‘WAIT_RECEIVE‘,‘WAIT_USE‘)
    )p1
    inner join 
    (
        select coupon_number,
               end_date,
               draw_limit_day
        from ods_qunshuo_amazingland_campaign_coupon
        where ds= ‘20191119‘
        and is_delete = ‘NO‘
        and coupon_number <> ‘‘
    ) p2
    on p1.coupon_id = p2.coupon_number
) t11
on t1.ID = t11.LeaguerBaseId_t11
left join
(
    select tb1.custNum as outbound_member_id,    -- 外呼会员编码,uuid
           tb1.staffNum as outbound_employee_id, -- 外呼员工id,不一定是会员所属员工id,有可能是别的员工外呼
           tb1.custPhone as outbound_member_phone,  -- 外呼会员手机号码
           tb2.outbound_time
    from ods_lixiang_immccdb_callout_info tb1
    inner join
    (
        select custNum as custNum_tb2, -- 会员编码
               max(createTime) as outbound_time  -- 最后一次外呼时间
        from ods_lixiang_immccdb_callout_info
        where ds= ‘20191119‘
        and call_status = 0 -- 呼叫成功 【理想插入数据之后,需要打开这个限制条件】
        group by custNum
    ) tb2
    on tb1.custNum = tb2.custNum_tb2
    where tb1.ds= ‘20191119‘
) t12
on t1.ID = t12.outbound_member_id
left join 
(
    select LeaguerID as LeaguerID_t13,
           card_coin_left
    from dwd_card_coin_left_last_recharge
    group by LeaguerID,card_coin_left
) t13
on t2.LeaguerID = t13.LeaguerID_t13
left join 
(
    select LeaguerID as LeaguerID_14,
           card_times_left
    from dwd_card_times_left_last_recharge
    group by LeaguerID,card_times_left
) t14
on t2.LeaguerID = t14.LeaguerID_14
left join
(
    SELECT ta.LeaguerID as LeaguerID_15,
           count(to_date(ta.createTime)) as consume_frequency_dateCard
    FROM ods_cuihua_dqmly_dy_sp_playLog_delta AS ta 
    INNER JOIN ods_cuihua_dqmly_dy_sp_timeCard AS tb
    ON ta.TimeCardId = tb.ID 
    where tb.ds= ‘20191119‘
    and tb.Category IN (2,3) -- 保证是计时卡会员
    and to_date(ta.createtime) >= date_sub(CURRENT_DATE,180) 
    and to_date(ta.createtime) <= CURRENT_DATE
    group by ta.LeaguerID
)t15 
on t2.LeaguerID = t15.LeaguerID_15 
left join
(
    SELECT ta.LeaguerID as LeaguerID_16,
           count(to_date(ta.createTime)) as consume_frequency_no_dateCard
    FROM ods_cuihua_dqmly_dy_sp_playLog_delta AS ta
    INNER JOIN ods_cuihua_dqmly_dy_sp_timeCard AS tb
    ON ta.TimeCardId = tb.ID
    where tb.ds= ‘20191119‘
    and tb.Category NOT IN (2,3) -- 保证是非计时卡会员
    and to_date(ta.createtime) >= date_sub(CURRENT_DATE,180) 
    and to_date(ta.createtime) <= CURRENT_DATE
    group by ta.LeaguerID
)t16 
on t2.LeaguerID = t16.LeaguerID_16;

19/11/20 13:47:17 INFO hive.HiveMetastoreCatalog: Inferring case-sensitive schema for table xcdqm_prd.ods_cuihua_dqmly_dy_sp_leaguerbase (inference mode: INFER_AND_SAVE)
19/11/20 13:47:17 INFO scheduler.DAGScheduler: Parents of final stage: List()
19/11/20 13:47:17 INFO scheduler.DAGScheduler: Missing parents: List()
19/11/20 13:47:17 INFO spark.SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:1039
19/11/20 13:47:17 INFO cluster.YarnScheduler: Adding task set 0.0 with 1 tasks
19/11/20 13:47:18 INFO spark.ExecutorAllocationManager: Requesting 1 new executor because tasks are backlogged (new desired total will be 1)

Exception in thread "broadcast-exchange-0" java.lang.OutOfMemoryError: Not enough memory to build and broadcast the table to all worker nodes. As a workaround, you can either disable broadcast by setting spark.sql.autoBroadcastJoinThreshold to -1 or increase the spark driver memory by setting spark.driver.memory to a higher value
	at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anonfun$relationFuture$1$$anonfun$apply$1.apply(BroadcastExchangeExec.scala:115)
	at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anonfun$relationFuture$1$$anonfun$apply$1.apply(BroadcastExchangeExec.scala:73)
	at org.apache.spark.sql.execution.SQLExecution$.withExecutionId(SQLExecution.scala:97)
	at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anonfun$relationFuture$1.apply(BroadcastExchangeExec.scala:72)
	at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anonfun$relationFuture$1.apply(BroadcastExchangeExec.scala:72)
	at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
	at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
	at java.lang.Thread.run(Thread.java:748)

  

以上是关于sparksql 复杂查询OOM的主要内容,如果未能解决你的问题,请参考以下文章

sparksql 概述

SparkSQL 查询数据框

SparkSQL大数据实战:揭开Join的神秘面纱

Spark SQL

SparkSql - 加入查询执行抛出“对象不是声明类的实例”

一个列表优化五版还是差怎么办 只能重构啊~~