Tag Archives: Grape

Realtime processing pipeliene

Grape 2.0

Recently we released rewritten from scratch version of Grape – our realtime processing pipeline.

It completely differs from what it was before. Instead of going over Storm‘s steps we dramatically changed its logic.

The main goal was data availability and data persistency. We created grape for those who can not afford losing data.

So we introduced two parts in Grape: persistent queue and workers.

Persistent queue uses simple push/pop/ack API to store/retrieve/ack chunk of data stored in Elliptics. Object may live in queue forever – until it is processed by the workers.

Contrary to Kafka we can not lose your data if ‘data-file’ was not read for a long time or its size overflows under constant write load.

Our queue may grow in distributed storage as long as it has space (which is usually considered as unlimited), and one may start processing workers not in push manner, but using pull design.

Push messaging systems implies the whole processing pipeline has to work with the same speed as pushing process. And if there are spikes of load which processing workers can not handle, data will likely be lost. Any pipeline modification (like resharding Kafka topics) ends up stopping processing.

Pull systems do not suffer from this ‘must-have-the-same-performance’ issue – one may start new worker nodes on demand, and even if they can not handle current write spike, it will be stored in the distributed persistent queue and catched up later.

We start 3 projects on Grape: realtime antifraud and human detection system, video-processing engine which detects road signs and Wikipedia->RDF generation pipeline.

Twitter realtime search engine

Twitter uses humans (pool of in-house ‘turks’ in mechanical turk) each time new trending topic is being propagated to search results: http://engineering.twitter.com/2013/01/improving-twitter-search-with-real-time.html

Every time.

And Storm is only used to gather statistics and detect trending topics. It uses Thrift to upload new active search term to Amazon’s Mechanical Turk.

Instead we want Grape – our realtime processing engine – to be able to perform much more complicated tasks. In particular, we implement secondary indexes and realtime search in elliptics over grape.

Grape’s ultimate goal is to implement a platform for every kind of realtime processing tasks. For this purpose we are developing a technology for guaranteed event processing, pipeline restart, event order preserve and so on. In realtime search this will be something like emit new event with new document uploaded to elliptics distributed storage, and that event will trigger whole search indexing (like stemming, language detection, inverted index updates and so on), and if one of those steps fail, we will restart indexing from failed point and proceed.

Highload++ presentation

Highload from Zbr

My highload++ conference presentation. It is quite useless without supporting speech text (you can read it in russian), but it is quite fun I think.

Presentation highlights elliptics state and features, its history and brief comparison to some other distributed storage systems. Second part covers Cocaine cloud engine and the last one shows off Grape realtime processing engine and its applications (like realtime attribute search engine) and mapreduce comparison.

There are hand drawn tits and cats of course.
Enjoy: http://www.slideshare.net/bioothod/highload-14865405

Grape: multiple events -> multiple applications

Grape is a realtime pipeline processing engine.
You can load your application into elliptics cluster and start data processing by triggering external events.

Grape is build using signal-slot model, so you can create multiple events which can handle different jobs over your data in parallel on multiple machines in the cluster. Every event can send a data reply back to original caller which concatenates them and returns when completion event is received.

But having single application which performs a whole bunch of processing events is not suitable for every situation. For example you may want to store intermediate processing results in the storage to create fallback mechanism, or you need multiple replies from one set of events to be concatenated, processed and sent to the next step in particular form…

There may be many cases where you may want to split your whole processing graph to subset of smaller applications, where each of which should be started somewhere from within the pipeline topology.

With 0.3.0 grape release we introduce new feature, which allows event within topology to start another application pipeline and wait for its completion (collecting reply data).

Something like this
Multiple applications within grape pipeline

In this example finish node for the first (black) application will start second application (blue) and will wait for its completion. It will receive all replies sent back from nodes within second application and then will send reply back to the original caller (start node of the first application), which will unblock client and return processed data.

I’ve updated grape server-side tutorial as well as example code in grape source package. Example application does exactly what’s described above now.


Server-side processing engine

Server-side (former known as server-side scripting) is an code execution environment created in elliptics on demand. We use Cocaine for this. This allows us to dynamically create pool of cgroups-bound processes or set of LXC containers to execute externally loaded code, which is triggered by client command. One may consider this as a write trigger, but actually it is more than that – it is a special command which may have your data attached and with whole access to local storage as well as any other external elements.

For example you may want to connect to external MySQL servers and trigger special command which will read or write data into elliptics only when special record is valid in SQL database (not that it could not be implemented in elliptics only, but for the sake of example simplicity).

Cocaine server-side pool can execute code in Python, Perl, Javascript (proof-of-concept) and binary compiled into shared library object (*.so). It is implemented through cocaine plugins (libcocaine-plugin-*) which are loaded by cocaine core (started in elliptics) on demand.

Cocaine engine must be enabled and initialized in elliptics config first. Then you have to load your code into elliptics in a way cocaine understands. In current (0.9) version this is done by cocaine-deploy tool. When you start your application (actually when first trigger event is received by elliptics), pools of proper workers are created and code starts. You may want to read detailed tutorial for step-by-step setup.

Serverside processing in elliptics is triggered by special command which operates with events. Event is basically a ‘application-name@event-name’ string with associated raw data embedded into elliptics packet. For every ‘application-name’ we start new cocaine engine (with its pools and so on).

Event can be blocked (if special flag is set) – in this case client blocks until event is fully processed (executed function returns) and error code as well as optional data is returned back to client from elliptics node. If event does not block, successful return means it was queued for execution.

It is possible to create the whole pipeline where each node sends new events to another nodes and so on over some kind of signal-slot topology spread over the whole cluster for parallel realtime execution. Grape is a framework which greatly simplifies this task getting the whole work of network transport away from programmer, who only need to implement processing nodes and register them via signal-slot topology model.

Realtime processing engine

We created a small framework called grape to fill the gap for realtime processing.

We use elliptics as a base messaging protocol, but whole logic of the routes and connection processing is hidden from the users of the system.

Basically, grape framework hides all messaging, routing, connection, locking and all other low-level stuff from higher layer.
One may create program (shared library which is linked with libgrape.so and exports function named initialize which takes json config sent from application config you are about to run) which will process events and associated data and emit other events which will be routed to different nodes in elliptics distributed network.

Grape was build on behalf of Storm processing engine, and while Twitter announced examples, which count words in your tweets (likely US election site created using this technology), we created realtime search engine.

It is a rather simple engine – we use Snowball stemmer, Chromium language detector and build namespaced reverse indexes (which are uncompressed msgpack’ed maps) with word positions.

Grape supports blocked operations, i.e. client waits until all events she emitted are finished and one of them emits completion event back to caller.
We can generate secondary indexes using the same application, it requires even less steps.

Client sends document msgpacked into structure which understands our engine (it is simple document-id/namespace-id/text/timestamp/flags tuple), which updates index for document-id/namespace-id and parses text extracting tokens using stemmer and language detector. Then every token/namespace-id’s reverse index is updated.

We do not aim at Lucene or Sphinx, this is a simpler technology just to show what our realtime processing engine can do besides twitter-like word counts :)

Unfortunately, documentation is rather subtle, but we write it right now and plan to show something interesting soon.

Grape – realtime pipeline processing engine

We are ready to present first release (second implementation actually) of the Grape – realtime pipeline processing engine made on steps of the Storm processing engine.

There were a fair number of issues why we could not easily switch to Storm, but the main one is that we want to process data that is being stored to elliptics.
You can create a signal-slot topology on top of elliptics cluster, where processing nodes are linked to events and can emit one or more events on its own. Elliptics cluster can host as many application topologies as you wish.

Code can be executed in separate process with specified rlimits or in a cgroup jailed environment, and later we plan to add containers (if there will be a strict demand).

Execution workers are those where elliptics runs its server-side scripts, and recently we switched from home-made pool of processes to very feature-rish Cocaine engine, which allows elliptics to run not only Python, but also JavaScript and Perl. It supports dynamic management of the process pool, jailed setup, extended statistics and quite flexible configuration. For example Cocaine can load your code from remote storage (elliptics or mongodb) or from local filesystem, it provides storage and logging abstractions to workers and so on.

Grape in turn runs inside those workers and currently only on top of elliptics. It is possible to add different message passing layers to this simple model, like your own transport (let’s say you want to deliver messages using multicast) and processing nodes (like build them on top of your own routing technology and not using elliptics).

Pipeline looks something like this (in c++-like pseudocode):

init() {
    topology = new topology(config)
    nodes = new elliptics_node_t[num]

    topology->add_slot("start-event", node[0])
    topology->add_slot("event0", node[0])
    topology->add_slot("event1", node[1])
    topology->add_slot("event2", node[2])
    topology->add_slot("finish", node[3])
node0::process(event, data) {
     /* process data */
    emit("event0", new-data)

     /* process data */
    emit("event1", new-data)

and so on…

Grape was made for C/C++ code processing for now, but will work with Python, Perl and JavaScript as soon as we create bindings.

Our main goal for Grape now is real-time search support for data uploaded into elliptics. Maybe we setup search-as-a-service project from this idea.

Here is the source code: https://github.com/ioremap/grape

Stay tuned, we will cook up nice documentation for elliptics, cocaine and grape reall soon!