Hive源码分析:Driver类运行过程

2013.08.22 | Comments

说明:

本文的源码分析基于hive-0.12.0-cdh5.0.1。

概括

从《hive cli的入口类》中可以知道hive中处理hive命令的处理器一共有以下几种:

(1)set       SetProcessor,设置修改参数,设置到SessionState的HiveConf里。 
(2)dfs       DfsProcessor,使用hadoop的FsShell运行hadoop的命令。 
(3)add       AddResourceProcessor,添加到SessionState的resource_map里,运行提交job的时候会写入Hadoop的Distributed Cache。 
(4)delete    DeleteResourceProcessor,从SessionState的resource_map里删除。
(5)reset     RestResourceProcessor,重置终端输出
(6)其他命令   Driver 

Driver类的主要作用是用来编译并执行hive命令,然后返回执行结果。这里主要分析Driver类的运行逻辑,其时序图如下:

hive-driver

从时序图上可以看出有以下步骤:

  • run方法调用内部方法runInternal
  • 在runInternal方法内部先,调用HiveDriverRunHookContext的preDriverRun方法
  • 调用compileInternal方法
  • compileInternal方法内部调用compile方法
  • compile方法内,先调用HiveSemanticAnalyzerHookContext的preAnalyze方法
  • 再进行语法分析,调用BaseSemanticAnalyzer的analyze方法
  • 调用HiveSemanticAnalyzerHookContext的postAnalyze方法
  • 再进行语法校验,调用BaseSemanticAnalyzer的validate方法
  • compileInternal方法运行完成之后,调用checkConcurrency方法
  • 再来运行execute方法,该方法用于运行任务
  • 最后,调用HiveDriverRunHookContext的postDriverRun方法

Driver初始化

在继续分析之前,需要弄清楚Driver类初始化时做了什么事情。

在CliDriver的processCmd(String cmd)方法中可以看到proc是在CommandProcessorFactory类中new出来的并调用了init方法。

} else { // local mode
      CommandProcessor proc = CommandProcessorFactory.get(tokens[0], (HiveConf) conf);
      ret = processLocalCmd(cmd, proc, ss);
}

CommandProcessorFactory.get方法代码片段:

if (conf == null) {
return new Driver();
}

Driver drv = mapDrivers.get(conf);
if (drv == null) {
drv = new Driver();
mapDrivers.put(conf, drv);
}
drv.init();

init方法和构造方法代码如下:

public void init() {
	Operator.resetId();
}

public Driver() {
    if (SessionState.get() != null) {
      conf = SessionState.get().getConf();
    }
  }

从上可以看出仅仅是初始化了conf属性和重置了Operator的id。

run方法过程

1、调用runInternal方法,根据该方法返回值判断是否出错。

2、runInternal方法内,运行HiveDriverRunHook的前置方法preDriverRun

3、判断是否需要编译,如果需要,则运行compileInternal(command)方法,并根据返回值判断是否该释放Hive锁。hive中可以配置hive.support.concurrency值为true并设置zookeeper的服务器地址和端口,基于zookeeper实现分布式锁以支持hive的多并发访问。这部分内容不是本文重点故不做介绍。

compileInternal(command)方法内部代码说明见下文。

4、判断是否需要对Task加锁。如果需要,则调用checkConcurrency方法。

5、调用execute()方法执行任务。

  • 执行计划开始:plan.setStarted();
  • 先运行ExecuteWithHookContext的前置hook方法,ExecuteWithHookContext类型有三种:前置、运行失败、后置。
  • 然后创建DriverContext用于维护正在运行的task任务,正在运行的task任务会添加到队列runnable中去。
  • 其次,在while循环中遍历队列中的任务,然后启动任务让其执行,并且轮训任务执行结果,如果任务运行完成,则将其从running中删除并将当前任务的子任务加入队列中;如果运行失败,则会启动备份的任务,并运行失败的hook。
while (runnable.peek() != null && running.size() < maxthreads) {
  Task<? extends Serializable> tsk = runnable.remove();
  launchTask(tsk, queryId, noName, running, jobname, jobs, driverCxt);
}
  • 在launchTask方法中,先判断是否支持并发执行,如果支持则调用TaskRunner的start()方法,否则调用tskRun.runSequential()方法顺序执行,只有当是MapReduce任务时,才执行并发执行:
	public void launchTask(Task<? extends Serializable> tsk, String queryId, boolean noName,
      Map<TaskResult, TaskRunner> running, String jobname, int jobs, DriverContext cxt) {

    if (SessionState.get() != null) {
      SessionState.get().getHiveHistory().startTask(queryId, tsk, tsk.getClass().getName());
    }
    if (tsk.isMapRedTask() && !(tsk instanceof ConditionalTask)) {
      if (noName) {
        conf.setVar(HiveConf.ConfVars.HADOOPJOBNAME, jobname + "(" + tsk.getId() + ")");
      }
      cxt.incCurJobNo(1);
      console.printInfo("Launching Job " + cxt.getCurJobNo() + " out of " + jobs);
    }
    tsk.initialize(conf, plan, cxt);
    TaskResult tskRes = new TaskResult();
    TaskRunner tskRun = new TaskRunner(tsk, tskRes);

    // Launch Task
    if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.EXECPARALLEL) && tsk.isMapRedTask()) {
      // Launch it in the parallel mode, as a separate thread only for MR tasks
      tskRun.start();
    } else {
      tskRun.runSequential();
    }
    running.put(tskRes, tskRun);
    return;
  }

最后任务的执行情况,就要看具体的Task<? extends Serializable>的实现类的逻辑了。

  • 执行计划完成:plan.setDone();

6、运行HiveDriverRunHook的后置方法postDriverRun

compileInternal方法过程

1、保存当前查询状态

QueryState queryState = new QueryState();

if (plan != null) {
  close();
  plan = null;
}

if (resetTaskIds) {
  TaskFactory.resetId();
}
saveSession(queryState);

QueryState中保存了HiveOperation以及当前查询语句或者命令。

2、创建Context上下文

command = new VariableSubstitution().substitute(conf,command);
ctx = new Context(conf);
ctx.setTryCount(getTryCount());
ctx.setCmd(command);
ctx.setHDFSCleanup(true);

3、创建ParseDriver对象,然后解析命令、生成AST树。语法和词法分析内容,不是本文重点故不做介绍。

ParseDriver pd = new ParseDriver();
ASTNode tree = pd.parse(command, ctx);
tree = ParseUtils.findRootNonNullToken(tree);

简单归纳来说,解析程包括如下:

  • 词法分析,生成AST树,ParseDriver完成。
  • 分析AST树,AST拆分成查询子块,信息记录在QB,这个QB在下面几个阶段都需要用到,SemanticAnalyzer.doPhase1完成。
  • 从metastore中获取表的信息,SemanticAnalyzer.getMetaData完成。
  • 生成逻辑执行计划,SemanticAnalyzer.genPlan完成。
  • 优化逻辑执行计划,Optimizer完成,ParseContext作为上下文信息进行传递。
  • 生成物理执行计划,SemanticAnalyzer.genMapRedTasks完成。
  • 物理计划优化,PhysicalOptimizer完成,PhysicalContext作为上下文信息进行传递。

4、读取环境变量,如果配置了语法分析的hook,参数为:hive.semantic.analyzer.hook,则:先用反射得到AbstractSemanticAnalyzerHook的集合,调用hook.preAnalyze(hookCtx, tree)方法,然后再调用sem.analyze(tree, ctx)方法,该方法才是用来作语法分析的,最后再调用hook.postAnalyze(hookCtx, tree)方法执行一些用户定义的后置操作;

否则,直接调用sem.analyze(tree, ctx)进行语法分析。

  BaseSemanticAnalyzer sem = SemanticAnalyzerFactory.get(conf, tree);
  List<AbstractSemanticAnalyzerHook> saHooks =
      getHooks(HiveConf.ConfVars.SEMANTIC_ANALYZER_HOOK,
               AbstractSemanticAnalyzerHook.class);

  // Do semantic analysis and plan generation
  if (saHooks != null) {
    HiveSemanticAnalyzerHookContext hookCtx = new HiveSemanticAnalyzerHookContextImpl();
    hookCtx.setConf(conf);
    hookCtx.setUserName(userName);
    for (AbstractSemanticAnalyzerHook hook : saHooks) {
      tree = hook.preAnalyze(hookCtx, tree);
    }
    sem.analyze(tree, ctx);
    hookCtx.update(sem);
    for (AbstractSemanticAnalyzerHook hook : saHooks) {
      hook.postAnalyze(hookCtx, sem.getRootTasks());
    }
  } else {
    sem.analyze(tree, ctx);
  }

5、校验执行计划:sem.validate()

6、创建查询计划QueryPlan。

plan = new QueryPlan(command, sem, perfLogger.getStartTime(PerfLogger.DRIVER_RUN),
           SessionState.get().getCommandType());

7、初始化FetchTask。

if (plan.getFetchTask() != null) {
   plan.getFetchTask().initialize(conf, plan, null);
}

8、得到schema

schema = getSchema(sem, conf);

9、授权校验工作。

hive中支持的hook

上面分析中,提到了hive的hook机制,hive中一共存在以下几种hook。

hive.semantic.analyzer.hook
hive.exec.filter.hook
hive.exec.driver.run.hooks
hive.server2.session.hook
hive.exec.pre.hooks
hive.exec.post.hooks
hive.exec.failure.hooks
hive.client.stats.publishers
hive.metastore.ds.connection.url.hook
hive.metastore.init.hooks

通过hook机制,可以在运行前后做一些用户想做的事情。如:你可以在语法分析的hook中对hive的操作做一些超级管理员级别的权限判断;你可以对hive-server2做一些session级别的控制。

cloudera的github仓库access中关于hive的访问控制就是使用了hive的hook机制。

twitter的mapreduce可视化项目监控项目ambrose也利用了hive的hook机制,有兴趣的话,你可以去看看其是如何使用hive的hook并且你也可以扩增hook做些自己想做的事情。

总结

本文主要介绍了hive运行过程,包括hive语法词法解析以及hook机制,任务的最后运行过程取决于具体的Task<? extends Serializable>的实现类的逻辑。关于hive语法词法解析,这一部分没有做详细的解释。

hive Driver类的执行过程如下(该图是根据hive-0.11版本画出来的):

hive-driver

参考文章

  1. hive 初始化运行流程
  2. Cloudera access
  3. twitter ambrose

原创文章,转载请注明: 转载自JavaChen Blog,作者:JavaChen
本文链接地址:http://blog.javachen.com/2013/08/22/hive-Driver.html
本文基于署名2.5中国大陆许可协议发布,欢迎转载、演绎或用于商业目的,但是必须保留本文署名和文章链接。 如您有任何疑问或者授权方面的协商,请邮件联系我。