Distributed computation in elliptics
In last days (or months) I thought a lot about what people really want from distributed system.
Usually it is enough just to store data being able to horizontally scale when needed. But when more and more people start using the system, they start getting out of the supported feature range.
In particular – people do not want to just store data, they want to implement some processing on top of it.
We added server-side scripting as a simple reply for that demand – server-side scripting is a way to perform arbitrary code on the data you provide to the system. In particular it can be thought of as a trigger for data write – we get data, process it and store.
Server-side scripts can be quite complex objects – for example in Pohmelfs we update directory content and hardlink counts using server-side scripts. They may read data, change object structure, serialize received objects, store combined entities back to low-level elliptics backend and so forth.
Server-side scripts can be treated as a low-level processing units which work with single object or key.
Using those basic ‘bricks’ we can create rather complex systems – it is possible to create streams and kind of graph processing engines, but it require enormous amount of low-level code. Something like ‘get-key, process-key, write-key-to-leafX, copy-to-leafY, wait-for-replyZ’ and so on. Changing computational topology is real pain.
What we really want is a way to describe how our processing engine should look. Something high-level like ‘compute-function-1(input-key) -> compute-function-2() and compute-function-3()’ and so on. Then we would submit that topology into the cluster and start emitting keys.
This whole idea I got from Storm presentation. Storm is a distributed real-time processing system recently bought by Twitter and used there for real-time analytics.
While I certainly do not like some of its concepts, in particular single master node (say hi to hadoop), closure/java implementation (it is really pain in the ass to support multiple stacks of technologies in a project – and we already have a bunch of them, so likely it is not ready to bring in java).
But Storm shows a really exceptional parallelism in computation and very convenient high-level abstraction. Network topology created on top of low-level objects like stream, spouts and bolts – this is exactly how we want to create processing engine, and not by thinking on how to receive message and how to send a reply.
My search/AI ideas definitely need something similar to be created for doing high-level computation modeling.
So I’m starting to think about how this can be created not bound to elliptics as its data storage, but generic enough.
This may sound like bicycle reinventing – and this definitely is in some way – I wish Storm could be created as embeddable library, so I could put it into our datastore easily… But it doesn’t, so we have to create something.
I would also add a true graph processing into this engine – the way Google’s Pregel work with its supersteps – this is what is missing in Storm. Storm is good for realtime ‘stateless’ processing – when your tuple is processed in the whole topology, it is forgotten. What I want is to create a way to synchronize parallel topologies, so that when they are completed, next topology gets started with the data previously generated by its predecessors.
Stay tuned. I promised to keep this blog running, and failed to find time to update it regulary.
But I will do more than my best to improve this situation :)
pthreads vs fork Grape – realtime pipeline processing engine
Comments are currently closed.

I’m working on sort of similar project like Storm and with same ideas as in your blog post. It still in development, but will be used to make queries and process any data. It is embeddable and have single api set for any data storage, like redis or leveldb or just simple file. It you find this helpful: http://github.com/x86-64/frozen/.
AFAIR it uses some DSL to describe processing network topology, and it is possible to sink data into some code using mongorel protocol?
Is there a way to embed own transport and not zeromq?
In particular I’m interested in using elliptics transport, since some tasks (like key-value read-modify-write updates) have to be done on elliptics nodes, so I want topology where processing nodes are part of elliptics.
I have also considered Storm for a while. But like you said clojure/java was a no go for me.
If you join or start a project like this i’ll be happy to help :)
That would be great!
I believe we will start it on github after all design goals are specified
Sure, it is not limited to any of transport. You need to start new module and implement it. After that it would be avaliable for querying and data exchange.
As for network topology – library itself do not make any distributed system, it defines only local for current process data and modules. But this local processes could be connected manually into complex network. I plan to change syntax a bit, so it would be easier to understand that, and probably hide some details about distribution part, to make it transparant.
Mongrel is just one of input modules, it is not mandatory.
If you interested we can chat somewhere, like skype, i’ll describe it more.
I’ve just returned, sorry for delay
I do not use skype, we could exchange emails, but I think this is useful information to share
So, there are several questions:
1. is it possible just to write code in graph’s vertex emmitting messages as data is being processed, leaving absolutely whole message passing to ‘invisible’ low level, in particular some level implemented in elliptics
something like:
t = topology() t.add_slot("event1", handler1) t.add_slot("event2", handler2) ... handlerX(std::string &event, std::string &data) { std::string some_new_data = data + "/whatever"; emit("eventY", some_new_data); } t.load()I.e. my processing vertexes do not know anything about where they are processed, and topology would handle network transfer itself.
2. How complex is to implement my own transport? Whether it is zeromq, elliptics, p2p, multicast or whatever else.
For example, we have big elliptics clusters and we want that eventX with keyY should be process on the nodes where data with this key is stored. If my
emit()function above is actually elliptics’swrite_data_wait(), and I specified a key, then I do know that my data will be processed on the node, which corresponds to given key.3. is there a high-level wrappers that I can use to quickly programm modules and topology (I suppose I write topology in language used in tutorial?) in C/C++?
How do they look?
1. There is machines as such handlers, machine can add or modify incoming data and pass it to next machine in chain. They exchange in form of hashes, where could be any amount of keys with values and you can get thise values and also change them.
2. It is not complex at all, you need to make sure your code can pass binary blobs as is, there is functions to pack incoming hash message into continious memory chunk and also to unpack it. Zeromq module will be good example for first time. Machines or datatypes itself know nothing about topology, how data is processed and so on. Everything tends to fit everything, because of same api set.
3. I’m working on it now, i get done nested data description, where one data use another. But topology description is not done yet, as well as topology information distribution. Where is a lot of code to be written.