您好,欢迎来到华拓科技网。
搜索
您的当前位置:首页Mesos源码分析(16): mesos-docker-executor的运行

Mesos源码分析(16): mesos-docker-executor的运行

来源:华拓科技网

mesos-docker-executor的运行代码在src/docker/executor.cpp中

 

 

如上一篇文章对MesosExecutorDriver的分析,Mesos-slave给Executor发送message运行Task,会调用DockerExecutor的launchTask函数。

  1. virtual void launchTask(ExecutorDriver* driver, const TaskInfo& task)
  2. {
  3.   dispatch(process.get(), &DockerExecutorProcess::launchTask, driver, task);
  4. }

 

最后调用DockerExecutorProcess的launchTask函数。

  1. void launchTask(ExecutorDriver* driver, const TaskInfo& task)
  2. {
  3.   if (run.isSome()) {
  4.     TaskStatus status;
  5.     status.mutable_task_id()->CopyFrom(task.task_id());
  6.     status.set_state(TASK_FAILED);
  7.     status.set_message(
  8.         "Attempted to run multiple tasks using a \"docker\" executor");
  9.  
  10.     driver->sendStatusUpdate(status);
  11.     return;
  12.   }
  13.  
  14.   // Capture the TaskID.
  15.   taskId = task.task_id();
  16.  
  17.   cout << "Starting task " << taskId.get() << endl;
  18.  
  19.   CHECK(task.has_container());
  20.   CHECK(task.has_command());
  21.  
  22.   CHECK(task.container().type() == ContainerInfo::DOCKER);
  23.  
  24.   // We're adding task and executor resources to launch docker since
  25.   // the DockerContainerizer updates the container cgroup limits
  26.   // directly and it expects it to be the sum of both task and
  27.   // executor resources. This does leave to a bit of unaccounted
  28.   // resources for running this executor, but we are assuming
  29.   // this is just a very small amount of overcommit.
  30.   run = docker->run(
  31.       task.container(),
  32.       task.command(),
  33.       containerName,
  34.       sandboxDirectory,
  35.       mappedDirectory,
  36.       task.resources() + task.executor().resources(),
  37.       None(),
  38.       Subprocess::FD(STDOUT_FILENO),
  39.       Subprocess::FD(STDERR_FILENO));
  40.  
  41.   run->onAny(defer(self(), &Self::reaped, driver, lambda::_1));
  42.  
  43.   // Delay sending TASK_RUNNING status update until we receive
  44.   // inspect output.
  45.   inspect = docker->inspect(containerName, DOCKER_INSPECT_DELAY)
  46.     .then(defer(self(), [=](const Docker::Container& container) {
  47.       if (!killed) {
  48.         TaskStatus status;
  49.         status.mutable_task_id()->CopyFrom(taskId.get());
  50.         status.set_state(TASK_RUNNING);
  51.         status.set_data(container.output);
  52.         if (container.ipAddress.isSome()) {
  53.           // TODO(karya): Deprecated -- Remove after 0.25.0 has shipped.
  54.           Label* label = status.mutable_labels()->add_labels();
  55.           label->set_key("Docker.NetworkSettings.IPAddress");
  56.           label->set_value(container.ipAddress.get());
  57.  
  58.           NetworkInfo* networkInfo =
  59.             status.mutable_container_status()->add_network_infos();
  60.  
  61.           // TODO(CD): Deprecated -- Remove after 0.27.0.
  62.           networkInfo->set_ip_address(container.ipAddress.get());
  63.  
  64.           NetworkInfo::IPAddress* ipAddress =
  65.             networkInfo->add_ip_addresses();
  66.           ipAddress->set_ip_address(container.ipAddress.get());
  67.         }
  68.         driver->sendStatusUpdate(status);
  69.       }
  70.  
  71.       return Nothing();
  72.     }));
  73.  
  74.   inspect.onReady(
  75.       defer(self(), &Self::launchHealthCheck, containerName, task));
  76. }

 

调用Docker函数启容器。

因篇幅问题不能全部显示,请点此查看更多更全内容

Copyright © 2019- huatuo6.cn 版权所有 赣ICP备2024042791号-9

违法及侵权请联系:TEL:199 18 7713 E-MAIL:2724546146@qq.com

本站由北京市万商天勤律师事务所王兴未律师提供法律服务