libprocess

libprocess was created originally at Berkley by Benjamin Hindman who is also the original creator of Mesos and implements “an actor style message-passing programming model (in C++).”, or "a library to support building systems out of composable concurrent components".

What this means, in practice, is that you can use it to implement a set of non-blocking processes which communicate with each other by sending “messages” (of an arbitrary nature – in Mesos, this means, for the most part Protocol Buffers) in a lock-free and race-free multi-threading model.

features

  • functional language idioms, for example, support of some/option like scala.
  • Enhancements: Try<T>
  • asynchronous event-loop (via libev)
  • parallel
  • integrated HTTP support (transport protocol)
  • testing infrastructure

In summary, it's a C++ library for building distributed systems.

Main Concepts

  • process and PID
  • local messaging via dispatch, delay, and defer
  • functional composition via promises/futures
  • remote messaging via send, route, and install

process and PID

process

using namespace    process;

class QueueProcess: public Process {}

int main(int argc, char **argv) {
  QueueProcess process;
  spawn(process);
  terminate(process);
  wait(process);
  return 0;
}

PID A PID provides a level of indirection for naming a process without having an actual reference to it (necessary for remote processes). It's really an HTTP URL:

http://ip:port/name    

http://ip:port/name/endpoint    

https://ip:port/name/endpoint
class QueueProcess : public Process {
public:          
  QueueProcess() : ProcessBase(“queue”) {}    //set name
};    
int main(int argc, char **argv) {
  QueueProcess process;
  PID    pid;

  pid = spawn(process); //or pid = process.self();
  teminate(pid)
  wait(pid)
  return 0;
}

Local Messaging

dispatch dispatch is asynchronous function call. Implemented in include/process/delay.hpp

  • each process has a “queue” of incoming “messages”
  • processes provide execution contexts (only one thread executing within a process at a time)
    • no per process synchronization!
    • no explicit “receive loop”
    • blocking within a process is strictly forbidden!
    • creating/spawning a process is very cheap (no stack allocation, thread creation, etc)
  • No "remote dispatch"; only for local processes.
using namespace    process;
class QueueProcess: public Process {
public:
  void enqueue(int i) {this->i = i;}
  int dequeue() {return this->i;}
private:
  int i;
};

int main(int argc, char **argv) {
  QueueProcess process;

  dispatch(process, &QueueProcess::enqueue, 42);
  dispatch(process, &QueueProcess::enqueue, 43);
  ...

  terminate(process);
  wait(process);
  return 0;
}

delay include/process/delay.hpp

delay(seconds(1), process, &QueueProcess::enqueue,42);

Function Composition

promise and future

cplusplus.com definition:

  1. A future is an object that can retrieve a value from some provider object or function, properly synchronizing this access if in different threads.
  2. A promise is an object that can store a value of typeT to be retrieved by a future object (possibly in another thread), offering a synchronization point.
template 
class QueueProcess {
public:
  void enqueue(T t) {promise.set(t);}
  Future dequeue() {return promise.future();}
private:
  Promise promise;
}

template 
class Queue {
public:
  Queue() {spawn(process);}
  ~Queue() {terminate(process); wait(process);}
  void enqueue(T t) {dispatch(process, &QueueProcess::enqueue, t);}
  Future dequeue() {return dispatch(process, &QueueProcess::dequeue, t);}
private:
  QueueProcess process;
}

int main (int argc, char **argv) {
  Queue queue;
  queue.enqueue(42);
  queue.dequeue()
    .then([] (int i) {
    //use i
    });
  return 0;  
}

defer queue.dequeue() .then(defer([] (int i) { //use i }));

Remote Messaging

route

using namespace process;
using namespace process::http;
class QueueProcess : public Process {
public:
  QueueProcess() : ProcessBase("Queue") {}
  virtual void initialize() {
    route("/enqueue", [] (Request request) {
      //parse argument from request.query and request.body
      enqueue(arg);
      return OK();
    });
  }
};

install

class QueueProcess : Process {
public:
  QueueProcess() : ProcessBase("queue") {};

  virtual void initialize() {
    install("enqueue", [] (PID<> from, string body) {
      //parse argument from body
      put(arg);
    });
  };
};

send

class QueueProcess : public Process {
public:
  QueueProcess() : ProcessBase("queue") {};

  virtual void initialize() {
    install("enqueue", [] (PID<> from, string body) {
      //parse argument from body
      send(from, "some responses");
    });
  };
};

protobuf

class QueueProcess : public ProtobufProcess {
public:
  QueueProcess() : ProcessBase("queue") {}

  virtual void initialize() {
    install (
    &EnqueueRequest::arg, 
    [] (int i) {
      enqueue(i);
      EnqueueResponse response;
      response.set_...();
      return response;
    });
  };
};

protocol

protobuf::Protocol enqueue;
class SomeProcess : Process {
public:
  void f() {
    EnqueueRequest request;
    request.set_arg(42);
    enqueue(pid, request)
      .onFailed(defer([] () {...}))
      .onReady(defer([] (EnqueueResponse response) {
        //use response
      }));
  }
}

Example Usage

  1. Master
     class master : public ProtobufProcess {
     };
    
  2. Install Message Handler
     void initialize() {
      install(&Master::reportProgress,&LaunchTasksMesssage::tasks);
     }
    
  3. Define ReportProgressMessage in protobuf format. message ReportProgressMessage { repeated Task tasks = 3; } message Task { required string name = 1; required TaskID task_id = 2; required float progress =3; } message TaskID { required string value = 1;

    }

  4. reportProgress void reportProgress(const vector& tasks) { for(int i=0;i<tasks.size();i++) {
    //
    
    } }
  5. main() 6.