libprocess
libprocess was created originally at Berkley by Benjamin Hindman who is also the original creator of Mesos and implements “an actor style message-passing programming model (in C++).”, or "a library to support building systems out of composable concurrent components".
What this means, in practice, is that you can use it to implement a set of non-blocking processes which communicate with each other by sending “messages” (of an arbitrary nature – in Mesos, this means, for the most part Protocol Buffers) in a lock-free and race-free multi-threading model.
features
- functional language idioms, for example, support of some/option like scala.
- Enhancements:
Try<T> - asynchronous event-loop (via libev)
- parallel
- integrated HTTP support (transport protocol)
- testing infrastructure
In summary, it's a C++ library for building distributed systems.
Main Concepts
- process and PID
- local messaging via dispatch, delay, and defer
- functional composition via promises/futures
- remote messaging via send, route, and install
process and PID
process
using namespace process;
class QueueProcess: public Process {}
int main(int argc, char **argv) {
QueueProcess process;
spawn(process);
terminate(process);
wait(process);
return 0;
}
PID A PID provides a level of indirection for naming a process without having an actual reference to it (necessary for remote processes). It's really an HTTP URL:
http://ip:port/name
http://ip:port/name/endpoint
https://ip:port/name/endpoint
class QueueProcess : public Process {
public:
QueueProcess() : ProcessBase(“queue”) {} //set name
};
int main(int argc, char **argv) {
QueueProcess process;
PID pid;
pid = spawn(process); //or pid = process.self();
teminate(pid)
wait(pid)
return 0;
}
Local Messaging
dispatch dispatch is asynchronous function call. Implemented in include/process/delay.hpp
- each process has a “queue” of incoming “messages”
- processes provide execution contexts (only one thread executing within a process at a time)
- no per process synchronization!
- no explicit “receive loop”
- blocking within a process is strictly forbidden!
- creating/spawning a process is very cheap (no stack allocation, thread creation, etc)
- No "remote dispatch"; only for local processes.
using namespace process;
class QueueProcess: public Process {
public:
void enqueue(int i) {this->i = i;}
int dequeue() {return this->i;}
private:
int i;
};
int main(int argc, char **argv) {
QueueProcess process;
dispatch(process, &QueueProcess::enqueue, 42);
dispatch(process, &QueueProcess::enqueue, 43);
...
terminate(process);
wait(process);
return 0;
}
delay include/process/delay.hpp
delay(seconds(1), process, &QueueProcess::enqueue,42);
Function Composition
promise and future
cplusplus.com definition:
- A future is an object that can retrieve a value from some provider object or function, properly synchronizing this access if in different threads.
- A promise is an object that can store a value of typeT to be retrieved by a future object (possibly in another thread), offering a synchronization point.
template
class QueueProcess {
public:
void enqueue(T t) {promise.set(t);}
Future dequeue() {return promise.future();}
private:
Promise promise;
}
template
class Queue {
public:
Queue() {spawn(process);}
~Queue() {terminate(process); wait(process);}
void enqueue(T t) {dispatch(process, &QueueProcess::enqueue, t);}
Future dequeue() {return dispatch(process, &QueueProcess::dequeue, t);}
private:
QueueProcess process;
}
int main (int argc, char **argv) {
Queue queue;
queue.enqueue(42);
queue.dequeue()
.then([] (int i) {
//use i
});
return 0;
}
defer queue.dequeue() .then(defer([] (int i) { //use i }));
Remote Messaging
route
using namespace process;
using namespace process::http;
class QueueProcess : public Process {
public:
QueueProcess() : ProcessBase("Queue") {}
virtual void initialize() {
route("/enqueue", [] (Request request) {
//parse argument from request.query and request.body
enqueue(arg);
return OK();
});
}
};
install
class QueueProcess : Process {
public:
QueueProcess() : ProcessBase("queue") {};
virtual void initialize() {
install("enqueue", [] (PID<> from, string body) {
//parse argument from body
put(arg);
});
};
};
send
class QueueProcess : public Process {
public:
QueueProcess() : ProcessBase("queue") {};
virtual void initialize() {
install("enqueue", [] (PID<> from, string body) {
//parse argument from body
send(from, "some responses");
});
};
};
protobuf
class QueueProcess : public ProtobufProcess {
public:
QueueProcess() : ProcessBase("queue") {}
virtual void initialize() {
install (
&EnqueueRequest::arg,
[] (int i) {
enqueue(i);
EnqueueResponse response;
response.set_...();
return response;
});
};
};
protocol
protobuf::Protocol enqueue;
class SomeProcess : Process {
public:
void f() {
EnqueueRequest request;
request.set_arg(42);
enqueue(pid, request)
.onFailed(defer([] () {...}))
.onReady(defer([] (EnqueueResponse response) {
//use response
}));
}
}
Example Usage
- Master
class master : public ProtobufProcess{ }; - Install Message Handler
void initialize() { install(&Master::reportProgress,&LaunchTasksMesssage::tasks); } Define ReportProgressMessage in protobuf format. message ReportProgressMessage { repeated Task tasks = 3; } message Task { required string name = 1; required TaskID task_id = 2; required float progress =3; } message TaskID { required string value = 1;
}
- reportProgress
void reportProgress(const vector
& tasks) { for(int i=0;i<tasks.size();i++) {
} }// - main() 6.