Mesos Application Development

Overview

scheduler

class BalloonScheduler : public Scheduler
{
public:
  BalloonScheduler(...)
  virtual ~BalloonScheduler()
  registered(...)
  reregistered(...)
  disconnected(...)

  virtual void resourceOffers(SchedulerDriver* driver,
                              const std::vector& offers)
  {
        set task and add resources

        tasks.push_back(task);
        driver->launchTasks(offer.id(), tasks);

        taskLaunched = true;
      }
    }
  }

  offerRescinded(...)

  statusUpdate(...)
  {
    if (protobuf::isTerminalState(status.state())) {
      if (status.state() == TASK_FAILED) {
        driver->abort();
      } else {
        driver->stop();
      }
    }
  }

  virtual void frameworkMessage(...)

  virtual void slaveLost(...)

  virtual void executorLost(...)

  virtual void error(...)

private:
  const ExecutorInfo executor;
  const size_t balloonLimit;
  bool taskLaunched;
};

executor

ExecutorInfo executor;
  executor.mutable_executor_id()->set_value("default");
  executor.mutable_command()->set_value(uri);
  executor.set_name("Balloon Executor");
  executor.set_source("balloon_test");

resource

Resource* mem = executor.add_resources();
  mem->set_name("mem");
  mem->set_type(Value::SCALAR);
  mem->mutable_scalar()->set_value(EXECUTOR_MEMORY_MB);

scheduler

BalloonScheduler scheduler(executor, limit.get());

framework

FrameworkInfo framework;
  framework.set_user(""); // Have Mesos fill in the current user.
  framework.set_name("Balloon Framework (C++)");

driver

MesosSchedulerDriver* driver;
driver = new MesosSchedulerDriver(
        &scheduler, framework, authenticator);
driver->run();
driver->stop();

Complete Source

...
#include 

#include 
#include 
#include 
#include 
#include 

using namespace mesos;
using namespace mesos::internal;

using std::cout;
using std::endl;
using std::string;

// The amount of memory in MB the executor itself takes.
const static size_t EXECUTOR_MEMORY_MB = 64;

class BalloonScheduler : public Scheduler
{
public:
  BalloonScheduler(const ExecutorInfo& _executor,
                   size_t _balloonLimit)
    : executor(_executor),
      balloonLimit(_balloonLimit),
      taskLaunched(false) {}

  virtual ~BalloonScheduler() {}

  virtual void registered(SchedulerDriver*,
                          const FrameworkID&,
                          const MasterInfo&)
  {
    std::cout << "Registered" << std::endl;
  }

  virtual void reregistered(SchedulerDriver*, const MasterInfo& masterInfo)
  {
    std::cout << "Reregistered" << std::endl;
  }

  virtual void disconnected(SchedulerDriver* driver)
  {
    std::cout << "Disconnected" << std::endl;
  }

  virtual void resourceOffers(SchedulerDriver* driver,
                              const std::vector& offers)
  {
    std::cout << "Resource offers received" << std::endl;

    for (size_t i = 0; i < offers.size(); i++) {
      const Offer& offer = offers[i];

      // We just launch one task.
      if (!taskLaunched) {
        double mem = getScalarResource(offer, "mem");
        assert(mem > EXECUTOR_MEMORY_MB);

        std::vector tasks;
        std::cout << "Starting the task" << std::endl;

        TaskInfo task;
        task.set_name("Balloon Task");
        task.mutable_task_id()->set_value("1");
        task.mutable_slave_id()->MergeFrom(offer.slave_id());
        task.mutable_executor()->MergeFrom(executor);
        task.set_data(stringify(balloonLimit));

        // Use up all the memory from the offer.
        Resource* resource;
        resource = task.add_resources();
        resource->set_name("mem");
        resource->set_type(Value::SCALAR);
        resource->mutable_scalar()->set_value(mem - EXECUTOR_MEMORY_MB);

        // And all the CPU.
        double cpus = getScalarResource(offer, "cpus");
        resource = task.add_resources();
        resource->set_name("cpus");
        resource->set_type(Value::SCALAR);
        resource->mutable_scalar()->set_value(cpus);

        tasks.push_back(task);
        driver->launchTasks(offer.id(), tasks);

        taskLaunched = true;
      }
    }
  }

  virtual void offerRescinded(SchedulerDriver* driver,
                              const OfferID& offerId)
  {
    std::cout << "Offer rescinded" << std::endl;
  }

  virtual void statusUpdate(SchedulerDriver* driver, const TaskStatus& status)
  {
    std::cout << "Task in state " << status.state() << std::endl;
    std::cout << "Source: " << status.source() << std::endl;
    std::cout << "Reason: " << status.reason() << std::endl;
    if (status.has_message()) {
      std::cout << "Message: " << status.message() << std::endl;
    }

    if (protobuf::isTerminalState(status.state())) {
      // NOTE: We expect TASK_FAILED here. The abort here ensures the shell
      // script invoking this test, considers the test result as 'PASS'.
      if (status.state() == TASK_FAILED) {
        driver->abort();
      } else {
        driver->stop();
      }
    }
  }

  virtual void frameworkMessage(SchedulerDriver* driver,
                                const ExecutorID& executorId,
                                const SlaveID& slaveId,
                                const string& data)
  {
    std::cout << "Framework message: " << data << std::endl;
  }

  virtual void slaveLost(SchedulerDriver* driver, const SlaveID& sid)
  {
    std::cout << "Slave lost" << std::endl;
  }

  virtual void executorLost(SchedulerDriver* driver,
                            const ExecutorID& executorID,
                            const SlaveID& slaveID,
                            int status)
  {
    std::cout << "Executor lost" << std::endl;
  }

  virtual void error(SchedulerDriver* driver, const string& message)
  {
    std::cout << "Error message: " << message << std::endl;
  }

private:
  const ExecutorInfo executor;
  const size_t balloonLimit;
  bool taskLaunched;
};

int main(int argc, char** argv)
{
  ...
  // Verify the balloon limit.
  Try limit = numify(argv[2]);
  ...

  // Find this executable's directory to locate executor.
  string uri;
  Option value = os::getenv("MESOS_BUILD_DIR");
  if (value.isSome()) {
    uri = path::join(value.get(), "src", "balloon-executor");
  } else {
    uri = path::join(
        os::realpath(Path(argv[0]).dirname()).get(),
        "balloon-executor");
  }

  ExecutorInfo executor;
  executor.mutable_executor_id()->set_value("default");
  executor.mutable_command()->set_value(uri);
  executor.set_name("Balloon Executor");
  executor.set_source("balloon_test");

  Resource* mem = executor.add_resources();
  mem->set_name("mem");
  mem->set_type(Value::SCALAR);
  mem->mutable_scalar()->set_value(EXECUTOR_MEMORY_MB);

  BalloonScheduler scheduler(executor, limit.get());

  FrameworkInfo framework;
  framework.set_user(""); // Have Mesos fill in the current user.
  framework.set_name("Balloon Framework (C++)");

  value = os::getenv("MESOS_CHECKPOINT");
  if (value.isSome()) {
    framework.set_checkpoint(
        numify(value.get()).get());
  }

  MesosSchedulerDriver* driver;
  value = os::getenv("MESOS_AUTHENTICATE");
  if (value.isSome()) {
    cout << "Enabling authentication for the framework" << endl;

    value = os::getenv("DEFAULT_PRINCIPAL");
    if (value.isNone()) {
      EXIT(1) << "Expecting authentication principal in the environment";
    }

    Credential credential;
    credential.set_principal(value.get());

    framework.set_principal(value.get());

    value = os::getenv("DEFAULT_SECRET");
    if (value.isNone()) {
      EXIT(1) << "Expecting authentication secret in the environment";
    }

    credential.set_secret(value.get());

    driver = new MesosSchedulerDriver(
        &scheduler, framework, argv[1], credential);
  } else {
    framework.set_principal("balloon-framework-cpp");

    driver = new MesosSchedulerDriver(
        &scheduler, framework, argv[1]);
  }

  int status = driver->run() == DRIVER_STOPPED ? 0 : 1;

  // Ensure that the driver process terminates.
  driver->stop();

  delete driver;
  return status;
}