hadoop-2

Posted it_worker365

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了hadoop-2相关的知识,希望对你有一定的参考价值。

AsyncDispatcher,直接看代码

  @Override
  protected void serviceStart() throws Exception {
    //start all the components
    super.serviceStart();
    eventHandlingThread = new Thread(createThread());
    eventHandlingThread.setName(dispatcherThreadName);
    eventHandlingThread.start();
  }

 

  Runnable createThread() {
    return new Runnable() {
      @Override
      public void run() {
        while (!stopped && !Thread.currentThread().isInterrupted()) {
          drained = eventQueue.isEmpty();
          // blockNewEvents is only set when dispatcher is draining to stop,
          // adding this check is to avoid the overhead of acquiring the lock
          // and calling notify every time in the normal run of the loop.
          if (blockNewEvents) {
            synchronized (waitForDrained) {
              if (drained) {
                waitForDrained.notify();
              }
            }
          }
          Event event;
          try {
            event = eventQueue.take();
          } catch(InterruptedException ie) {
            if (!stopped) {
              LOG.warn("AsyncDispatcher thread interrupted", ie);
            }
            return;
          }
          if (event != null) {
            dispatch(event);
          }
        }
      }
    };
  }
  protected void dispatch(Event event) {
    //all events go thru this loop
    if (LOG.isDebugEnabled()) {
      LOG.debug("Dispatching the event " + event.getClass().getName() + "."
          + event.toString());
    }

    Class<? extends Enum> type = event.getType().getDeclaringClass();

    try{
      EventHandler handler = eventDispatchers.get(type);
      if(handler != null) {
        handler.handle(event);
      } else {
        throw new Exception("No handler for registered for " + type);
      }
    } catch (Throwable t) {
      //TODO Maybe log the state of the queue
      LOG.fatal("Error in dispatcher thread", t);
      // If serviceStop is called, we should exit this thread gracefully.
      if (exitOnDispatchException
          && (ShutdownHookManager.get().isShutdownInProgress()) == false
          && stopped == false) {
        stopped = true;
        Thread shutDownThread = new Thread(createShutDownThread());
        shutDownThread.setName("AsyncDispatcher ShutDown handler");
        shutDownThread.start();
      }
    }
  }

-------------------------------------------------------------------------------------------------------------------------------------

一个handler的实现

public class JobHistoryEventHandler extends AbstractService implements EventHandler<JobHistoryEvent> {
      protected BlockingQueue<JobHistoryEvent> eventQueue = new LinkedBlockingQueue<JobHistoryEvent>();
      protected boolean handleTimelineEvent = false;
      protected AsyncDispatcher atsEventDispatcher = null;
...... @Override
public void handle(JobHistoryEvent event) { try { eventQueue.put(event); // Process it for ATS (if enabled) if (handleTimelineEvent) { atsEventDispatcher.getEventHandler().handle(event); } } } }

 


以上是关于hadoop-2的主要内容,如果未能解决你的问题,请参考以下文章

Hadoop-2.4.1学习之InputFormat及源代码分析

Hadoop-2.4.1学习之InputFormat及源代码分析

Hadoop 2.7.3 容器启动异常,由于 AM 容器退出代码而失败:127

如何运行自带wordcount-Hadoop2

编写hadoop程序并打成jar包上传到hadoop集群运行

微信小程序代码片段