Overall Sequence

Scheduler interface

  • registered:Invoked when the scheduler successfully registers with a Mesos master.
  • reregistered: Invoked when the scheduler re-registers with a newly elected Mesos master.
  • disconnected: Invoked when the scheduler becomes "disconnected" from the master (e.g., the master fails and another is taking over).
  • resourceOffers:Invoked when resources have been offered to this framework. A single offer will only contain resources from a single slave. Resources associated with an offer will not be re-offered to this framework until either (a) this framework has rejected those resources (see SchedulerDriver::launchTasks) or (b) those resources have been rescinded (see Scheduler::offerRescinded).

    Note that resources may be concurrently offered to more than one framework at a time (depending on the allocator being used). In that case, the first framework to launch tasks using those resources will be able to use them while the other frameworks will have those resources rescinded (or if a framework has already launched tasks with those resources then those tasks will fail with a TASK_LOST status and a message saying as much).

  • offerRescinded:Invoked when an offer is no longer valid (e.g., the slave was lost or another framework used resources in the offer).
  • statusUpdate:Invoked when the status of a task has changed (e.g., a slave is lost and so the task is lost, a task finishes and an executor sends a status update saying so, etc).
  • frameworkMessage: Invoked when an executor sends a message.
  • slaveLost
  • executorLost
  • error

Framework's scheduler

Framework needs to provide the specific scheduler implementation.

class BalloonScheduler : public Scheduler
{
public:
  BalloonScheduler(...)
  virtual ~BalloonScheduler()
  registered(...)
  reregistered(...)
  disconnected(...)

  virtual void resourceOffers(SchedulerDriver* driver,
                              const std::vector& offers)
  {
        set task and add resources

        tasks.push_back(task);
        driver->launchTasks(offer.id(), tasks);

        taskLaunched = true;
      }
    }
  }

  offerRescinded(...)

  statusUpdate(...)
  {
    if (protobuf::isTerminalState(status.state())) {
      if (status.state() == TASK_FAILED) {
        driver->abort();
      } else {
        driver->stop();
      }
    }
  }

  virtual void frameworkMessage(...)

  virtual void slaveLost(...)

  virtual void executorLost(...)

  virtual void error(...)

private:
  const ExecutorInfo executor;
  const size_t balloonLimit;
  bool taskLaunched;
};

SchedulerDriver interface

  • start: Starts the scheduler driver.
  • stop: Stops the scheduler driver.
  • abort: Aborts the driver so that no more callbacks can be made to the scheduler.
  • join: Waits for the driver to be stopped or aborted, possibly // blocking the current thread indefinitely.
  • run: Starts and immediately joins (i.e., blocks on) the driver.
  • requestResources: Requests resources from Mesos.
  • launchTasks:Launches the given set of tasks.
      launchTasks(
          const std::vector& offerIds,
          const std::vector& tasks,
          const Filters& filters = Filters()
    
  • killTask(const TaskID& taskId): Kills the specified task.
  • acceptOffers:Accepts the given offers and performs a sequence of operations on those accepted offers.
  • declineOffer:Declines an offer in its entirety and applies the specified filters on the resources
  • reviveOffers:Removes all filters previously set by the framework
  • suppressOffers:Inform Mesos master to stop sending offers to the framework.
  • reconcileTasks: Allows the framework to query the status for non-terminal tasks.

MesosSchedulerDriver

MesosSchedulerDriver is the concrete implementation of a SchedulerDriver that connects a Scheduler with a Mesos master.

  • Scheduler *scheduler
  • FrameworkInfo framework
  • std::string master
  • internal::SchedulerProcess* process: Used for communicating with the master.
  • std::string url: URL for the master (e.g., zk://, file://, etc).
  • std::recursive_mutex mutex: Mutex for enforcing serial execution of all non-callbacks.
  • process::Latch *latch: Latch for waiting until driver terminates.
  • Status status: Current status of the driver.
  • const bool implicitAcknowlegements
  • const Credential *credential
  • std::string schedulerId: Scheduler process ID.

Example Usage

MesosSchedulerDriver* driver;
driver = new MesosSchedulerDriver(
        &scheduler, framework, authenticator);
driver->run();
driver->stop();

Code Walk

Constructor

MesosSchedulerDriver::MesosSchedulerDriver
=>initialize()
  =>process::initialize: Initialize libprocess.
  =>latch = new Latch(): Initialize Latch.
  =>Set framework user and hostname.
  =>Set url.

start

start does two crutial things:

  1. Create the MasterDetector which discovers the master. Detector may be standalone or distributed (zeekeeper).
  2. Create the process SchedulerProcess, which communicates with master through HTTP/Protobuf.
MesosSchedulerDriver::start()
=>detector_ = DetectorPool::get(url)
  =>detector = MasterDetector::create(url)
    =>new ZooKeeperMasterDetector(url.get())
    =>new StandaloneMasterDetector(protobuf::createMasterInfo(pid)
  =>result = shared_ptr(detector.get());
=>process = new SchedulerProcess()
=>spawn(process);

stop

MesosSchedulerDriver::stop()
=>dispatch(process, &SchedulerProcess::stop, failover);

join

// Otherwise, wait for stop() or abort() to trigger the latch.
  CHECK_NOTNULL(latch)->await();

  synchronized (mutex) {
    CHECK(status == DRIVER_ABORTED || status == DRIVER_STOPPED);

    return status;
  }

run

Status status = start();
  return status != DRIVER_RUNNING ? status : join();

reconcileTasks

MesosSchedulerDriver::reconcileTasks
=>dispatch(process, &SchedulerProcess::reconcileTasks, statuses);

requestResources

MesosSchedulerDriver::requestResources
=>dispatch(process, &SchedulerProcess::requestResources, requests);

SchedulerProcess

class SchedulerProcess : public ProtobufProcess

initialize()

install(&SchedulerProcess::receive);

    install(
        &SchedulerProcess::registered,
        &FrameworkRegisteredMessage::framework_id,
        &FrameworkRegisteredMessage::master_info);

    install(
        &SchedulerProcess::reregistered,
        &FrameworkReregisteredMessage::framework_id,
        &FrameworkReregisteredMessage::master_info);

    install(
        &SchedulerProcess::resourceOffers,
        &ResourceOffersMessage::offers,
        &ResourceOffersMessage::pids);

    install(
        &SchedulerProcess::rescindOffer,
        &RescindResourceOfferMessage::offer_id);

    install(
        &SchedulerProcess::statusUpdate,
        &StatusUpdateMessage::update,
        &StatusUpdateMessage::pid);

    install(
        &SchedulerProcess::lostSlave,
        &LostSlaveMessage::slave_id);

    install(
        &SchedulerProcess::lostExecutor,
        &ExitedExecutorMessage::executor_id,
        &ExitedExecutorMessage::slave_id,
        &ExitedExecutorMessage::status);

    install(
        &SchedulerProcess::frameworkMessage,
        &ExecutorToFrameworkMessage::slave_id,
        &ExecutorToFrameworkMessage::executor_id,
        &ExecutorToFrameworkMessage::data);

    install(
        &SchedulerProcess::error,
        &FrameworkErrorMessage::message);

    // Start detecting masters.
    detector->detect()
      .onAny(defer(self(), &SchedulerProcess::detected, lambda::_1));

MasterDetector

Factory

MesosSchedulerDriver::start()
=>detector_ = DetectorPool::get(url)
  =>detector = MasterDetector::create(url)
    =>new ZooKeeperMasterDetector(url.get())
    =>new StandaloneMasterDetector(protobuf::createMasterInfo(pid)
  =>result = shared_ptr(detector.get());

Constructor

process = new StandaloneMasterDetectorProcess();
  spawn(process);

appoint

dispatch(process, &StandaloneMasterDetectorProcess::appoint, leader);

detect

return dispatch(process, &StandaloneMasterDetectorProcess::detect, previous);

SchedulerProcess

detected()

authenticate() => doReliableRegistration()
or 
doReliableRegistration()

doReliableRegistration

Call call;
    call.set_type(Call::SUBSCRIBE);
    Call::Subscribe* subscribe = call.mutable_subscribe();
    subscribe->mutable_framework_info()->CopyFrom(framework);
    send(master.get().pid(), call);

Events

Messeages

Master

Master::initialize

install(&Master::receive);

Master::receive

scheduler::Call::SUBSCRIBE