Kyuubi 输出任务进度信息改造

Posted 疯狂哈丘

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Kyuubi 输出任务进度信息改造相关的知识,希望对你有一定的参考价值。

文章目录

一、Hive QueryLog的实现

由于Kyuubi、Spark ThriftServer的实现框架都是用的Hive Server。因此这里只介绍Hive Server是如何实现QueryLog的。

客户端方面

在HiveServer实现的jdbc客户端中,获取QueryLog实现是在HiveStatement类的getQueryLog()方法中。这个方法的底层是调用了Thrift实现的FetchResults方法,从方法字面含义来看,这是个获取查询结果的方法。其实不然,Hive通过一个fetchType参数来区分是获取查询结果还是获取QueryLog。fetchType=1时,HiveServer会返回QueryLog,fetchType=0时返回查询结果。

服务端实现

我们可以看HiveServer关于FetchResults()的实现:

//HiveSessionImpl.java
@Override
  public RowSet fetchResults(OperationHandle opHandle, FetchOrientation orientation,
      long maxRows, FetchType fetchType) throws HiveSQLException 
    acquire(true, false);
    try 
        //fetchType==0,则返回查询结果
      if (fetchType == FetchType.QUERY_OUTPUT) 
        return operationManager.getOperationNextRowSet(opHandle, orientation, maxRows);
      
      //否则查询日志
      return operationManager.getOperationLogRowSet(opHandle, orientation, maxRows, sessionConf);
     finally 
      release(true, false);
    
  

当客户端传过来的fetchType=1时,HiveServer会拉取对应Operation的日志返回。

日志的读取

HiveServer关于Operation的日志读写逻辑都封装在OperationLog类中。OperationLog维护着一个logFile的变量,logFile表示本地磁盘上的一个文件,具体地址为:$hive.server2.logging.operation.log.location/$sessionId/$operationId,也就是一个Operation对应一个日志文件。

HiveServer要读取对应Operation的日志只要读取这个日志文件的内容即可。

日志的写入

OperationLog日志的写入是由LogDivertAppender类来负责的。LogDivertAppender继承了log4j的AbstractOutputStreamAppender类,是Hive自己实现的一个日志追加器。

在OprationManager初始化时,Hive就会构造一个LogDivertAppender实例并将其加入到log4j的上下文中,因此后续通过log4j输出日志都会经过LogDivertAppender(LogDivertAppender可以选择要不要输出,往哪输出

//OperationManager.java  
private void initOperationLogCapture(String loggingMode) 
    // Register another Appender (with the same layout) that talks to us.
    Appender ap = LogDivertAppender.createInstance(this, OperationLog.getLoggingLevel(loggingMode));
    LoggerContext context = (LoggerContext) LogManager.getContext(false);
    Configuration configuration = context.getConfiguration();
    LoggerConfig loggerConfig = configuration.getLoggerConfig(LoggerFactory.getLogger(getClass()).getName());
    loggerConfig.addAppender(ap, null, null);
    context.updateLoggers();
    ap.start();
  

我们再来看LogDivertAppender是怎么处理日志的。

Appender输出日志的核心在于append()方法,LogDivertAppender重写的这个方法:

//LogDivertAppender.java 
@Override
  public void append(LogEvent event) 
    super.append(event);

    String logOutput = getOutput();
    manager.reset();
	//从ThreadLocal变量中获取到OperationLog
    OperationLog log = operationManager.getOperationLogByThread();
    if (log == null) 
      LOG.debug(" ---+++=== Dropped log event from thread " + event.getThreadName());
      return;
    
    //使用OperationLog输出日志
    log.writeOperationLog(logOutput);
  

从这个方法可以看出,LogDivertAppender在接收一个LogEvent后,尝试从当前线程中获取OperationLog,如果获取到了,就用这个实例输出日志。

我们接着看这个OperationLog实例时如何绑定到线程中去的:

//SQLOperation$BackgroundWork.java
@Override
    public void run() 
      PrivilegedExceptionAction<Object> doAsAction = new PrivilegedExceptionAction<Object>() 
        @Override
        public Object run() throws HiveSQLException 
          Hive.set(parentHive);
          // TODO: can this result in cross-thread reuse of session state?
          SessionState.setCurrentSessionState(parentSessionState);
          PerfLogger.setPerfLogger(parentPerfLogger);
          // 注册OperationLog的实例到当前线程中
          registerCurrentOperationLog();
          registerLoggingContext();
          try 
            if (asyncPrepare) 
              prepare(queryState);
            
            runQuery();
           catch (HiveSQLException e) 
            setOperationException(e);
            LOG.error("Error running hive query: ", e);
           finally 
            unregisterLoggingContext();
            unregisterOperationLog();
          
          return null;
        
      ;

      try 
        currentUGI.doAs(doAsAction);
       catch (Exception e) 
        setOperationException(new HiveSQLException(e));
        LOG.error("Error running hive query as user : " + currentUGI.getShortUserName(), e);
       finally 
        xxxxxx
      
    

上面是SQLOperation的一部分代码,即Hive执行SQL的一部分代码,可以看出,Hive在执行SQL前,会先将OperationLog的实例绑定到当前线程。之后,只要本线程内用log4j组件输出的日志(log.info等操作),都会输出到OperationLog对应的文件中去。

二、Kyuubi 增加进度日志输出

在HiveServer中,它启动的RemoteDriver会不断和HiveServer进行沟通,并且RemoteDriver内部又实现了一个新的SparkListener,因此RemoteDriver可以传递任务运行的进度信息给HiveServer,之后由Operation所在的线程接收并输出日志。因此,在HiveServer中我们可以正常的看到任务执行时的进度信息。

在Kyuubi Server中,则提交SQL是通过构造一个SparkContext实例来执行Sql,它虽然也实现了一个KyuubiServerListener,但是这个Listener并没有实现输出进度信息日志的功能。因此我们需要自己改在kyuubi使其支持进度日志输出。

首先,我们要想知道任务的进度信息,最快的办法就是注册一个SparkListener,因此我们可以直接对kyuubi的KyuubiServerListener做改造。

从上一节我们知道了,要想往客户端输出queryLog,必须往执行代码的线程中绑定好对应的OperationLog实例才行。但是SparkListener各个生命周期方法的执行线程都是在[event-loop]线程中执行的,因此不可能绑定过OperationLog。

所以,我们需要解决一个问题:

如果在SparkListener的生命周期方法线程中绑定对应的OperationLog实例?

首先,每个在Kyuubi执行的Sql都会分配一个StatementId,对应着一个Operation实例,首先,我们要先建立statementId到OperationLog的映射关系:

//定义个存放statementId和OperationLog映射关系的类
object OperationLogManager 
  val handleToOperationLog = new ConcurrentHashMap[String, OperationLog]

//ExecuteStatementOperation.scala
  private def registerCurrentOperationLog(): Unit = 
    if (isOperationLogEnabled) 
      if (operationLog == null) 
        warn("Failed to get current OperationLog object of Operation: " +
          getHandle.getHandleIdentifier)
        isOperationLogEnabled = false
       else 
        session.getSessionMgr.getOperationMgr.setOperationLog(operationLog)
            //增加statementId到operationLog的映射,后续可以通过statementId找到OperationLog
        OperationLogManager.handleToOperationLog.put(statementId, operationLog)
      
    
  

之后,在指定SQL之前,先往KyuubiServerListener中注册statementId:

//ExecuteStatementInClientMode.scala
override protected def execute(): Unit = 
    try 
      val userName = session.getUserName
      info(s"Running $userName's query '$statement' with $statementId")
      setState(RUNNING)
      MetricsSystem.get.foreach(_.RUNNING_QUERIES.inc)
      val classLoader = SparkSQLUtils.getUserJarClassLoader(sparkSession)
      Thread.currentThread().setContextClassLoader(classLoader)
	 //执行sql前kyuubi会主动调用onStatementStart方法,因此我们改造onStatementStart即可
      KyuubiServerMonitor.getListener(userName).foreach 
        _.onStatementStart(
          statementId,
          session.getSessionHandle.getSessionId.toString,
          statement,
          statementId,
          userName)
      
      ...
     catch 
      case e: KyuubiSQLException if !isClosedOrCanceled =>
        val err = KyuubiSparkUtil.exceptionString(e)
        onStatementError(statementId, e.getMessage, err)
        throw e
      case e: Throwable if !isClosedOrCanceled =>
        val err = KyuubiSparkUtil.exceptionString(e)
        onStatementError(statementId, e.getMessage, err)
        throw new KyuubiSQLException(err, e.getClass.getSimpleName, e)
     finally 
      MetricsSystem.get.foreach m =>
        m.RUNNING_QUERIES.dec()
        m.TOTAL_QUERIES.inc()
      
    
  
//KyuubiServerListener.scala
  def onStatementStart(
      id: String,
      sessionId: String,
      statement: String,
      groupId: String,
      userName: String = "UNKNOWN"): Unit = synchronized 
    val info = new ExecutionInfo(statement, sessionId, System.currentTimeMillis, userName)
    info.state = ExecutionState.STARTED
    executionList.put(id, info)
    trimExecutionIfNecessary()
    sessionList(sessionId).totalExecution += 1
    executionList(id).groupId = groupId
	//新增一个JobId2StatementIdMap变量,映射jobId和statementId
    JobId2StatementIdMap.put(nowJobId, id)
    totalRunning += 1
    //nowJobId为我们新增的变量
    nowJobId += 1
  

首先,Kyuubi在执行sql前kyuubi会主动调用onStatementStart方法,因此我们改造onStatementStart即可。在onStatementStart方法中,我们就可以将jobId和statementId绑定起来了(目前认为一个sql对应一个jobId)。之后,在各个生命周期方法中,我们通过JobId获取到StatementId,再通过StatementId获取到对应的OperationLog实例,然后将该实例绑定到当前线程中即可往客户端输出QueryLog日志。

改造后的KyuubiServerListener如下:

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package yaooqinn.kyuubi.ui

import scala.collection.JavaConverters._
import scala.collection.mutable
import org.apache.spark.KyuubiSparkUtil, SparkConf, Success
import org.apache.spark.scheduler.SparkListener, SparkListenerJobEnd, SparkListenerJobStart, SparkListenerStageCompleted, SparkListenerTaskEnd
import org.apache.spark.sql.internal.SQLConf
import yaooqinn.kyuubi.operation.OperationLog, OperationLogManager

class KyuubiServerListener(conf: SparkConf) extends SparkListener 

  private[this] var onlineSessionNum: Int = 0
  private[this] val sessionList = new mutable.LinkedHashMap[String, SessionInfo]
  private[this] val executionList = new mutable.LinkedHashMap[String, ExecutionInfo]
  private[this] val JobId2StatementIdMap = new mutable.LinkedHashMap[Long, String]
  private[this] val stageIdJobIdMap = new mutable.LinkedHashMap[Int, Long]
  private[this] val job2TaskNumberMap = new mutable.LinkedHashMap[Long, Int]
  private[this] val job2TaskCompleteCountMap = new mutable.LinkedHashMap[Long, Int]
  private[this] val job2TaskFailCountMap = new mutable.LinkedHashMap[Long, Int]
  private[this] val retainedStatements =
    conf.getInt(SQLConf.THRIFTSERVER_UI_STATEMENT_LIMIT.key, 200)
  private[this] val retainedSessions = conf.getInt(SQLConf.THRIFTSERVER_UI_SESSION_LIMIT.key, 200)
  private[this] var totalRunning = 0
  private[this] var nowJobId = 0

  def getOnlineSessionNum: Int = synchronized 
    onlineSessionNum
  

  def getTotalRunning: Int = synchronized 
    totalRunning
  

  def getSessionList: Seq[SessionInfo] = synchronized 
    sessionList.values.toSeq
  

  def getSession(sessionId: String): Option[SessionInfo] = synchronized 
    sessionList.get(sessionId)
  

  def getExecutionList: Seq[ExecutionInfo] = synchronized 
    executionList.values.toSeq
  

  override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = 
    stageIdJobIdMap.remove(stageCompleted.stageInfo.stageId)
  

  override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = 
    val jobId = stageIdJobIdMap(taskEnd.stageId)
    val statementId = JobId2StatementIdMap(jobId)
    val log = OperationLogManager.handleToOperationLog.get(statementId)

    OperationLog.setCurrentOperationLog(log)
    var completeCount = job2TaskCompleteCountMap.getOrElse(jobId, 0)
    var failCount = job2TaskFailCountMap.getOrElse(jobId, 0)

    if (taskEnd.reason.toString.equals("Success")) 
      completeCount = completeCount + 1
      job2TaskCompleteCountMap.put(jobId, completeCount)
     else 
      failCount = failCount + 1
      job2TaskFailCountMap.put(jobId, failCount)
    

    val totalCount = job2TaskNumberMap(jobId)
    log.info(s"Job TaskMetrics: +$completeCount(-$failCount)/$totalCount")
    OperationLog.removeCurrentOperationLog()
  

  override def onJobStart(jobStart: SparkListenerJobStart): Unit = synchronized 
    try 
      jobStart.stageIds.foreach(stageIdJobIdMap.put(_, jobStart.jobId))
      val totalTaskCount = jobStart.stageInfos.map(_.numTasks).sum
      job2TaskNumberMap.put(jobStart.jobId, totalTaskCount)

      val statementId = JobId2StatementIdMap(jobStart.jobId)
      val log = OperationLogManager.handleToOperationLog.get(statementId)
      OperationLog.setCurrentOperationLog(log)
      log.info("jobId=" + jobStart.jobId + " total stage number:" + jobStart.stageIds.size)
      OperationLog.removeCurrentOperationLog()
     catch 
      case e: Throwable => e.printStackTrace()
    

    for 
      props <- Option(jobStart.properties)
      groupIdKey <- props.stringPropertyNames().asScala
        .filter(_.startsWith(KyuubiSparkUtil.getJobGroupIDKey))
      groupId <- Option(props.getProperty(groupIdKey))
      (_, info) <- executionList if info.groupId == groupId
     
      info.jobId += jobStart.jobId.toString
      info.groupId = groupId
    
  

  override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = 
    job2TaskNumberMap.remove(jobEnd.jobId)
    job2TaskCompleteCountMap.remove(jobEnd.jobId)
    job2TaskFailCountMap.remove(jobEnd.jobId)
    JobId2StatementIdMap.remove(jobEnd.jobId)
  

  def onSessionCreated(ip: String, sessionId: String, userName: String = "UNKNOWN"): Unit = 
    synchronized 
      val info = new SessionInfo(sessionId, System.currentTimeMillis, ip, userName)
      sessionList.put(sessionId, info)
      onlineSessionNum += 1
      trimSessionIfNecessary()
    
  

  def onSessionClosed(sessionId: String): Unit = synchronized 
    sessionList(sessionId).finishTimestamp = System.currentTimeMillis
    onlineSessionNum -= 1
    trimSessionIfNecessary()
  

  def onStatementStart(
                        id: String,
                        sessionId: String,
                        statement: String,
                        groupId: String,
                        userName: String = "UNKNOWN"): Unit = synchronized 
    val info = new ExecutionInfo(statement, sessionId, System.currentTimeMillis, userName)
    info.state = ExecutionState.STARTED
    executionList.put(id, info)
    trimExecutionIfNecessary()
    sessionList(sessionId).totalExecution += 1
    executionList(id).groupId = groupId

    JobId2StatementIdMap.put(nowJobId, id)
    totalRunning += 1
    nowJobId += 1
  

  def onStatementParsed(id: String, executionPlan: String): Unit = synchronized 
    executionList(id).executePlan = executionPlan
    executionList(id).state = ExecutionState.COMPILED
  

  def onStatementError(id: String, errorMessage: String, errorTrace: String): Unit = 
    synchronized 
      executionList(id).finishTimestamp = System.currentTimeMillis
      executionList(id).detail = errorMessage
      executionList(id).state = ExecutionState.FAILED
      totalRunning -= 1
      trimExecutionIfNecessary()
    
  

  def onStatementFinish(id: String): Unit Apache Kyuubi在网易的深度实践

迅雷影音下方的进度条儿任务栏怎么去掉,每次都会点到,而且浮出来以

网易Kyuubi

进度条达到产品目标移动应用数据加载策略模式交互设计标准与信息表诉方式

celery 任务的实时进度跟踪

python运行的结果可以下载吗