Overall Sequence

Scheduler interface
- registered:Invoked when the scheduler successfully registers with a Mesos master.
- reregistered: Invoked when the scheduler re-registers with a newly elected Mesos master.
- disconnected: Invoked when the scheduler becomes "disconnected" from the master (e.g., the master fails and another is taking over).
- resourceOffers:Invoked when resources have been offered to this framework. A single offer will only contain resources from a single slave. Resources associated with an offer will not be re-offered to this framework until either (a) this framework has rejected those resources (see SchedulerDriver::launchTasks) or (b) those resources have been rescinded (see Scheduler::offerRescinded).
Note that resources may be concurrently offered to more than one framework at a time (depending on the allocator being used). In that case, the first framework to launch tasks using those resources will be able to use them while the other frameworks will have those resources rescinded (or if a framework has already launched tasks with those resources then those tasks will fail with a TASK_LOST status and a message saying as much).
- offerRescinded:Invoked when an offer is no longer valid (e.g., the slave was lost or another framework used resources in the offer).
- statusUpdate:Invoked when the status of a task has changed (e.g., a slave is lost and so the task is lost, a task finishes and an executor sends a status update saying so, etc).
- frameworkMessage: Invoked when an executor sends a message.
- slaveLost
- executorLost
- error
Framework's scheduler
Framework needs to provide the specific scheduler implementation.
class BalloonScheduler : public Scheduler
{
public:
BalloonScheduler(...)
virtual ~BalloonScheduler()
registered(...)
reregistered(...)
disconnected(...)
virtual void resourceOffers(SchedulerDriver* driver,
const std::vector& offers)
{
set task and add resources
tasks.push_back(task);
driver->launchTasks(offer.id(), tasks);
taskLaunched = true;
}
}
}
offerRescinded(...)
statusUpdate(...)
{
if (protobuf::isTerminalState(status.state())) {
if (status.state() == TASK_FAILED) {
driver->abort();
} else {
driver->stop();
}
}
}
virtual void frameworkMessage(...)
virtual void slaveLost(...)
virtual void executorLost(...)
virtual void error(...)
private:
const ExecutorInfo executor;
const size_t balloonLimit;
bool taskLaunched;
};
SchedulerDriver interface
- start: Starts the scheduler driver.
- stop: Stops the scheduler driver.
- abort: Aborts the driver so that no more callbacks can be made to the scheduler.
- join: Waits for the driver to be stopped or aborted, possibly // blocking the current thread indefinitely.
- run: Starts and immediately joins (i.e., blocks on) the driver.
- requestResources: Requests resources from Mesos.
- launchTasks:Launches the given set of tasks.
launchTasks( const std::vector& offerIds, const std::vector & tasks, const Filters& filters = Filters() - killTask(const TaskID& taskId): Kills the specified task.
- acceptOffers:Accepts the given offers and performs a sequence of operations on those accepted offers.
- declineOffer:Declines an offer in its entirety and applies the specified filters on the resources
- reviveOffers:Removes all filters previously set by the framework
- suppressOffers:Inform Mesos master to stop sending offers to the framework.
- reconcileTasks: Allows the framework to query the status for non-terminal tasks.
MesosSchedulerDriver
MesosSchedulerDriver is the concrete implementation of a SchedulerDriver that connects a Scheduler with a Mesos master.
- Scheduler *scheduler
- FrameworkInfo framework
- std::string master
- internal::SchedulerProcess* process: Used for communicating with the master.
- std::string url: URL for the master (e.g., zk://, file://, etc).
- std::recursive_mutex mutex: Mutex for enforcing serial execution of all non-callbacks.
- process::Latch *latch: Latch for waiting until driver terminates.
- Status status: Current status of the driver.
- const bool implicitAcknowlegements
- const Credential *credential
- std::string schedulerId: Scheduler process ID.
Example Usage
MesosSchedulerDriver* driver;
driver = new MesosSchedulerDriver(
&scheduler, framework, authenticator);
driver->run();
driver->stop();
Code Walk
Constructor
MesosSchedulerDriver::MesosSchedulerDriver
=>initialize()
=>process::initialize: Initialize libprocess.
=>latch = new Latch(): Initialize Latch.
=>Set framework user and hostname.
=>Set url.
start
start does two crutial things:
- Create the MasterDetector which discovers the master. Detector may be standalone or distributed (zeekeeper).
- Create the process SchedulerProcess, which communicates with master through HTTP/Protobuf.
MesosSchedulerDriver::start()
=>detector_ = DetectorPool::get(url)
=>detector = MasterDetector::create(url)
=>new ZooKeeperMasterDetector(url.get())
=>new StandaloneMasterDetector(protobuf::createMasterInfo(pid)
=>result = shared_ptr(detector.get());
=>process = new SchedulerProcess()
=>spawn(process);
stop
MesosSchedulerDriver::stop()
=>dispatch(process, &SchedulerProcess::stop, failover);
join
// Otherwise, wait for stop() or abort() to trigger the latch.
CHECK_NOTNULL(latch)->await();
synchronized (mutex) {
CHECK(status == DRIVER_ABORTED || status == DRIVER_STOPPED);
return status;
}
run
Status status = start();
return status != DRIVER_RUNNING ? status : join();
reconcileTasks
MesosSchedulerDriver::reconcileTasks
=>dispatch(process, &SchedulerProcess::reconcileTasks, statuses);
requestResources
MesosSchedulerDriver::requestResources
=>dispatch(process, &SchedulerProcess::requestResources, requests);
SchedulerProcess
class SchedulerProcess : public ProtobufProcess
initialize()
install(&SchedulerProcess::receive);
install(
&SchedulerProcess::registered,
&FrameworkRegisteredMessage::framework_id,
&FrameworkRegisteredMessage::master_info);
install(
&SchedulerProcess::reregistered,
&FrameworkReregisteredMessage::framework_id,
&FrameworkReregisteredMessage::master_info);
install(
&SchedulerProcess::resourceOffers,
&ResourceOffersMessage::offers,
&ResourceOffersMessage::pids);
install(
&SchedulerProcess::rescindOffer,
&RescindResourceOfferMessage::offer_id);
install(
&SchedulerProcess::statusUpdate,
&StatusUpdateMessage::update,
&StatusUpdateMessage::pid);
install(
&SchedulerProcess::lostSlave,
&LostSlaveMessage::slave_id);
install(
&SchedulerProcess::lostExecutor,
&ExitedExecutorMessage::executor_id,
&ExitedExecutorMessage::slave_id,
&ExitedExecutorMessage::status);
install(
&SchedulerProcess::frameworkMessage,
&ExecutorToFrameworkMessage::slave_id,
&ExecutorToFrameworkMessage::executor_id,
&ExecutorToFrameworkMessage::data);
install(
&SchedulerProcess::error,
&FrameworkErrorMessage::message);
// Start detecting masters.
detector->detect()
.onAny(defer(self(), &SchedulerProcess::detected, lambda::_1));
MasterDetector
Factory
MesosSchedulerDriver::start()
=>detector_ = DetectorPool::get(url)
=>detector = MasterDetector::create(url)
=>new ZooKeeperMasterDetector(url.get())
=>new StandaloneMasterDetector(protobuf::createMasterInfo(pid)
=>result = shared_ptr(detector.get());
Constructor
process = new StandaloneMasterDetectorProcess();
spawn(process);
appoint
dispatch(process, &StandaloneMasterDetectorProcess::appoint, leader);
detect
return dispatch(process, &StandaloneMasterDetectorProcess::detect, previous);
SchedulerProcess
detected()
authenticate() => doReliableRegistration()
or
doReliableRegistration()
doReliableRegistration
Call call;
call.set_type(Call::SUBSCRIBE);
Call::Subscribe* subscribe = call.mutable_subscribe();
subscribe->mutable_framework_info()->CopyFrom(framework);
send(master.get().pid(), call);
Events
Messeages
Master
Master::initialize
install