Tensorflow 图计算引擎概述

tensorflow的用户可以使用多种语言来构造的自己的图, 但各种语言的API最终都会经由C API 进入tensorflow 运行时. 可以说, 对于运行时代码, 其上边界就是C API. 比如, 通过python描述的一张网络, 就是通过类似下面的几个python-C接口进入运行时的.

#9  0x00007f9de118daa4 in PyEval_EvalFrameEx ()
#10 0x00007f9de118f0bd in PyEval_EvalCodeEx ()

tensorflow整体上可以看做一个”图语言的编译器”, 和所有编译器的优化以及翻译的功能类似, Graph在运行时中的处理过程, 可以分为 图构造->图优化->图执行 几个阶段. 其中, 图优化随同图构造一同被执行.

全图构造及其优化

Session初次构造时, 应用层代码中定义的数据流图转换成GraphDef格式, 经由C API传入DirectSession.Extend(), 参考调用栈如下

PyEval_EvalCodeEx()
  PyEval_EvalFrameEx()
    _wrap_ExtendSession()
      tensorflow::ExtendSession()
        tensorflow::ExtendSessionGraphHelper()
          tensorflow::SessionRef::Extend()
            tensorflow::DirectSession::ExtendLocked()
              tensorflow::DirectSession::MaybeInitializeExecutionState(out_already_initialized/already_initialized)
                if out_already_initialized:
                  return
                  flib_def_.reset(new FunctionLibraryDefinition())
                tensorflow::GraphExecutionState::MakeForBaseGraph()
                    std::unique_ptr<GraphExecutionState> ret(new GraphExecutionState(graph_def, options));
                    if (!ret->session_options_->config.graph_options().place_pruned_graph()):
                      tensorflow::GraphExecutionState::InitBaseGraph()
                        OptimizationPassRegistry::Global()->RunGrouping(PRE_PLACEMENT)
                        Placer placer()
                        placer.Run()
                        OptimizationPassRegistry::Global()->RunGrouping(POST_PLACEMENT)
                out_already_initialized = false
              if already_initialized:
                flib_def_->AddLibrary(graph.library())
                std::unique_ptr<GraphExecutionState> state
                execution_state_->Extend(graph, &state))
                execution_state_.swap(state)
    _wrap_TF_SessionRun_wrapper()
      tensorflow::TF_SessionRun_wrapper()
        tensorflow::TF_SessionRun_wrapper_helper()
          TF_SessionRun()
            TF_Run_Helper()
              tensorflow::SessionRef::Run()
                tensorflow::DirectSession::Run()  
                  DirectSession::GetOrCreateExecutors(executors_and_keys)
                    CreateExecutors()
                      std::unique_ptr<ExecutorsAndKeys> ek(new ExecutorsAndKeys); 
                      std::unordered_map<string, std::unique_ptr<Graph>> graphs;
                      CreateGraphs(&graphs)
                        if (options_.config.graph_options().place_pruned_graph()):
                          MakeForPrunedGraph()
                            ret->InitBaseGraph()
                              if (session_options_ && session_options_->config.graph_options().place_pruned_graph()):
                                PruneGraph()
                                  if (options.use_function_convention):
                                    feed_rewrites.emplace_back(new subgraph::ArgFeedRewrite())
                                    fetch_rewrites.emplace_back(new subgraph::RetvalFetchRewrite())
                                    ValidateFeedAndFetchDevices()
                                  else:
                                    feed_rewrites.emplace_back(new subgraph::RecvFeedRewrite())
                                    fetch_rewrites.emplace_back(new subgraph::SendFetchRewrite())
                                  subgraph::RewriteGraphForExecution(graph, feed_rewrites, fetch_rewrites)
                              OptimizationPassRegistry::Global()->RunGrouping(PRE_PLACEMENT)
                              Placer placer()
                              placer.run()
                              OptimizationPassRegistry::Global()->RunGrouping(POST_PLACEMENT)
                            graph_ = new_graph.release();
                            ret->BuildGraph()
                              OptimizeGraph()
                                grappler::RunMetaOptimizer()
                                  MetaOptimizer::Optimize()
                              if (session_options_ == nullptr || !session_options_->config.graph_options().place_pruned_graph()) {
                                PruneGraph()
                              std::unique_ptr<ClientGraph> dense_copy(new ClientGraph)
                        else:
                          execution_state->BuildGraph();

-0- 初次构造图, tensorflow/core/common_runtime/graph_execution_state.cc
-3-26- 构造图的两个入口, -3-如果不允许对全图进行Prune, 就在Session构造的时候进行OptimizationPassRegistry优化, 并在Session::Run的时候执行Grappler全图优化, 而如果允许Prune, 那么全图的Prune, OptimizationPassRegistry优化和Grappler优化都会放在Session::Run, 随着Executor的首次构造一同构造Graph并优化
-11- 初次构造图, tensorflow/core/common_runtime/graph_execution_state.cc
-15- InitBaseGraph, 根据配置决定是否进行Prune, 核心是完成Placement. 这里不会进行Prune,
-16- 执行 PRE_PLACEMENT 优化器
-18- 遍历Graph, 根据算法为每个Node分配DEVICE. 原则是在满足必要的约束的前提下, 尽可能满足上层的放置要求. XLA JIT使用DEVICE_GPU_XLA_JIT和DEVICE_CPU_XLA_JIT,
-19- 执行 POST_PLACEMENT 优化器
-24- 扩展构造图, Extend()代码内有注释解释整个扩展流程
-29- C API入口
-33- 将每个Graph分割为若干子图(Graph, Partition), 同时, 针对每个Partition, 构造相应的 Executor.
-40- InitBaseGraph, 根据配置决定是否进行Prune, 核心是完成Placement. 这里会进行Prune
-53- XLA JIT是通过注册XLA_CPU_DEVICE和XLA_GPU_DEVICE来接入图计算引擎, 所以,XLA JIT实现OptimizationPassRegistry的9个Pass也就不难理解, 参考https://www.tensorflow.org/guide/graph_optimization
-56-62- BuildGraph, 根据Graph构造ClientGraph, 生成可独立执行的子图。BuildGraph执行前, 所在的GraphExecutionState对象一定是已经执行过InitBaseGraph了. 这里会根据配置决定是否进行Prune, 核心是Grappler优化, 如果通过MakeForBaseGraph进到这里(-62-), 就会进行Prune, 如果通过MakeForPrunedGraph进到这里, 就不会进行Prune, 所以, Prune是两种方式构造的全图都会进行, 区别在于, MakeForBaseGraph是InitBaseGraph不进行Prune –> InitBaseGraph进行Placement –> BuildGraph进行Grappler –> BuildGraph进行Prune, 而MakeForPrunedGraph是InitBaseGraph进行Prune –> InitBaseGraph进行Placement –> BuildGraph进行Grappler –> BuildGraph不进行Prune, 看, 只是Prune时机不同而已.
-57- Grappler,Grappler是全图优化。

图切分及其优化

如果允许对图进行剪枝(pruned)和切分(partitioned), 就会在首次Session.run()中随着Executor的构造一同进行。

PyEval_EvalFrameEx()
  _wrap_TF_SessionRun_wrapper()
    tensorflow::TF_SessionRun_wrapper()
      tensorflow::TF_SessionRun_wrapper_helper()
        TF_SessionRun()
          TF_Run_Helper()
            tensorflow::SessionRef::Run()
              tensorflow::DirectSession::Run()  
                DirectSession::GetOrCreateExecutors(executors_and_keys)
                  CreateExecutors()
                    std::unique_ptr<ExecutorsAndKeys> ek(new ExecutorsAndKeys); 
                    std::unordered_map<string, std::unique_ptr<Graph>> graphs;
                    CreateGraphs(&graphs)
                      if (options_.config.graph_options().place_pruned_graph()):
                        MakeForPrunedGraph()
                      else:
                        execution_state->BuildGraph();
                          OptimizationPassRegistry::Global()->RunGrouping(POST_REWRITE_FOR_EXEC)
                      Partition(popts, &client_graph->graph, &partitions)
                      OptimizationPassRegistry::Global()->RunGrouping(POST_PARTITIONING)
                    ek->items.reserve(graphs.size());
                    ProcessFunctionLibraryRuntime()
                    GraphOptimizer optimizer()
                    for iter in graphs:
                      optimizer.Optimize(&partition_graph)
                        for rounds < kMaxRounds; ++rounds:
                          RemoveListArrayConverter()
                          RemoveDeadNodes()
                          RemoveIdentityNodes()
                          ConstantFold()
                          OptimizeCSE()
                          FixupSourceAndSinkEdges()
                          ExpandInlineFunctions()
                        std::unique_ptr<Graph> copy(new Graph(g->flib_def()));
                        CopyGraph(*g, copy.get());
                        graph->swap(copy);  
                      LocalExecutorParams params;
                      params. = ...
                      item->graph = partition_graph.get();
                      NewExecutor(&item->executor)
                        ExecutorFactory::GetFactory(executor_type, &factory)
                          auto iter = executor_factories()->find(executor_type);
                          *out_factory = iter->second;
                        factory->NewExecutor(params,..., out_executor) //如果"DEFAULT"/"", NewLocalExecutor()
                          NewLocalExecutor()
                            ExecutorImpl* impl = new ExecutorImpl(params, std::move(graph));
                            *executor = impl;
                  executors_.emplace(&ek)
                FunctionCallFrame call_frame()  
                DirectSession::RunInternal()
                  const size_t num_executors = executors_and_keys->items.size();
                  ExecutorBarrier* barrier = new ExecutorBarrier()
                  Executor::Args args;
                   thread::ThreadPool* pool = ...            
                  Executor::Args::Runner default_runner = [this, pool](Executor::Args::Closure c) {pool->Schedule(std::move(c));}  runner的实现
                  for item in executors_and_keys->items:
                    args.runner = default_runner
                    args.runner = [this, device_thread_pool](Executor::Args::Closure c) {device_thread_pool->Schedule(std::move(c));
                    item.executor->RunAsync(args, barrier->Get()); 
                  WaitForNotification()

-5- C API 入口
-10- 将每个Graph分割为若干子图(Graph, Partition), 针对每个Partition, 构造相应的 Executor.
-18- 执行 POST_REWRITE_FOR_EXEC 优化器
-19- 子图分割
-20- 执行 POST_PARTITIONING 优化器
-24- 逐个优化每一个子图, 预期中的ConstatFold和CSE等优化措施都在这里执行
-40- 为该 Partition 构造 Executor, 所以, 一共有多少个子图, 就有多少个 Executor, 考虑到图在执行过程中可能会发生修改, 所以所有的Executor会缓存起来
-48- 保存所有的executor_and_keys
-56- 遍历Executor们
-57- 分配线程池, 要么用公共的, 要么每个Executor有自己的线程池, 后续执行节点运算的时候会用到.
-59- 执行该Executor下的图

图执行

tfop的图执行引擎的核心思想是: 不断寻找每一个入度为0 的节点, 执行之, 直到整张图被遍历完成. Tensorflow并没有简单的使用粗暴的使用线程池来提高性能,而是用了下图中的C方式,以一种比较谨慎的方式进行并行操作:只有当一个节点是expensive的时候,才会开新线程计算

ExecutorImpl::RunAsync(args, barrier->Get()); //executor.cc
  (new ExecutorState(args, this))->RunAsync(std::move(done)); 
    const Graph* graph = impl_->graph_.get();
    TaggedNodeSeq ready;
    for  Node* n in impl_->root_nodes_:
      DCHECK_EQ(n->in_edges().size(), 0)
      ready.push_back(TaggedNode{n, root_frame_, 0, false});
    num_outstanding_ops_ = ready.size();
    root_frame_->iterations[0]->outstanding_ops = ready.size();
    done_cb_ = std::move(done);
    // Schedule to run all the ready ops in thread pool.
    ScheduleReady(ready, nullptr);  
      if tagged_node.is_dead || !item.kernel->IsExpensive():
        inline_ready->push_back(tagged_node);
      else
        runner_(std::bind(&ExecutorState::Process, ...)) //tensorflow::::ExecutorState::Process()
          EntryVector outputs
          while (!inline_ready.empty()):
            const NodeItem& item = *gview.node(id);
            s = PrepareInputs(iterm)
            stats = nullptr
            outputs.clear()
            if (stats_collector_ && !tagged_node.is_dead):
              stats = stats_collector_->CreateNodeExecStats(node) --> 应该是NodeExecStatsWrapper
            if (item.kernel_is_async):
              tensorflow::Device::ComputeAsync()
                tensorflow::HorovodBroadcastOp::ComputeAsync()
            else:
              OpKernelContext ctx(&params, item.num_outputs);
              nodestats::SetOpStart(stats);
                stats->RecordComputeStarted();
              tensorflow::Device::Compute()  
                tensorflow::XlaCompileOp::Compute() 
              // After item->kernel computation is done, processes its outputs.  
              ProcessOutputs(ctx, stats) 
              for i in item.num_outputs:
                const TensorValue val = ctx->release_output(i);
                    TensorValue value = outputs_[index];
                    outputs_[index] = TensorValue();
                    return value
                Entry* out = &((*outputs)[i]);
                out->val.Init(std::move(*val.tensor));
              nodestats::SetMemory(stats, &ctx); 
                stats->SetMemory(ctx);
                  auto* ms = stats_->mutable_memory_stats();
                  ms->set_persistent_memory_size(ctx->persistent_memory_allocated());
            if !launched_asynchronously:
              MaybeMarkCompleted()
              // After processing the outputs, propagates the outputs to their dsts.
              // Contents of *outputs are left in an indeterminate state after
              // returning from this method.
              PropagateOutputs(tagged_node, &item, &outputs, &ready); 
              completed = NodeDone();
                ScheduleReady(ready, inline_ready);
          if completed:
            ScheduleFinish()

-1- 图执行入口, executor.cc
-7- 准备ready节点, 即入度为0的节点
-12- op调度器, 整个执行引擎就是通过不断执行的ScheduleReady()来驱动的
-16- 图执行核心逻辑, tensorflow::::ExecutorState::Process() 处理ready的节点
-24- 创建节点执行状态
-25- 执行节点计算逻辑, 先执行AsyncOpKernel, 再执行OpKernel, tfop的Compute()方法就在这里被调用的
-35- 一个节点的计算输出会保存在输入的OpKernelContext中, 此处将其取出
-44- TODO: NodeExecStatsWrapper 的?
-47- 对于同步加载的节点
-52- 将该节点输出传播给其后继节点
-18- 遍历每一个传入的ready节点
-13- 对于每一个待处理的tagged_node, 如果是 dead || 非expensive || 当前线程的inline_node为空, 使用当前线程执行该节点计算
-28- 否则将其放入线程池, 由其他线程执行




Related:
Tensorflow OpKernel机制详解
Tensorflow Op机制详解
Tensorflow Optimization机制详解
Tensorflow 图计算引擎概述

Leave a Reply

This site uses Akismet to reduce spam. Learn how your comment data is processed.