Reverbrain is a company we started to provide support and solutions for distributed storage and realtime computation problems.
We created the whole stack of technologies ranging from the lowest level of data storage upto high-level processing pipeline.
Time moves on and we see how complex is to deal with massively increasing amounts of data. We provide solutions for true-way horizontal scaling with fair system complexity. Our storage appliances lay in the area where one has to host billions of small-to-medium objects upto huge data streaming systems.
Your data should nost just lay in archive in the distributed system — realtime pipeline processing is our vision of how data has to be handled. We use them by ourself and want to provide the best experience for our customers.
Hi, it’s SaveTheRbtz speaking, I’m an engineer responsible to eblob code maintenance and improvement. There have been much of work done with eblob code base for past half a year, including big stuff like data-sort and l2lash, but also there were tons of small but useful improvements like rwlocks, memory usage improvements and start up speed.
I’m here to present recent wiki updates regarding some eblob features e.g. one that explains how reading and writing in eblob works and what optimizations make it faster than plain directory with billions of files.
Also we’ve expanded data-sort manual: now it has more info about data-sort and defragmentation including pros and cons.
Next release of eblob will break backward compatibility in terms of both ABI and API, but it will allow us not only improve performance and provide new features but also greatly simplify API itself and remove some useless functionality. For example we are now working on writev()-like (aka scatter/gather) interface for blob that will allow us to efficiently implement metadata in elliptics.
Full roadmap for eblob could be seen in corresponding wiki article too.
Recently we released rewritten from scratch version of Grape – our realtime processing pipeline.
It completely differs from what it was before. Instead of going over Storm‘s steps we dramatically changed its logic.
The main goal was data availability and data persistency. We created grape for those who can not afford losing data.
So we introduced two parts in Grape: persistent queue and workers.
Persistent queue uses simple push/pop/ack API to store/retrieve/ack chunk of data stored in Elliptics. Object may live in queue forever – until it is processed by the workers.
Contrary to Kafka we can not lose your data if ‘data-file’ was not read for a long time or its size overflows under constant write load.
Our queue may grow in distributed storage as long as it has space (which is usually considered as unlimited), and one may start processing workers not in push manner, but using pull design.
Push messaging systems implies the whole processing pipeline has to work with the same speed as pushing process. And if there are spikes of load which processing workers can not handle, data will likely be lost. Any pipeline modification (like resharding Kafka topics) ends up stopping processing.
Pull systems do not suffer from this ‘must-have-the-same-performance’ issue – one may start new worker nodes on demand, and even if they can not handle current write spike, it will be stored in the distributed persistent queue and catched up later.
We start 3 projects on Grape: realtime antifraud and human detection system, video-processing engine which detects road signs and Wikipedia->RDF generation pipeline.
This will be a short article on how to tune your VM for maximum write performance.
I’m not sure whether it is generic enough, but at least this is very useful when you have write-heavy workload in append-like mode, or when there are multiple files and you write into one after another (change files when some boundary has been crossed).
This is the case for Eblob – our the most widely used low-level local storage in Elliptics. Eblob can be turned into append system, which allows overwrite, but if new size is larger than old one, then object being written is copied into new location and more space is reserved for future writes. Old copy is marked as removed and will be deleted when defragmentation takes place.
Every such ‘copy’ within eblob reserves 2 times more space than current size of the object.
If you write into Elliptics using different keys each time, then they will be appended to the blob’s end. When blob’s size reaches its maximum size described in config (or reaches maximum number of elements, which is also configurable parameter), new blob is created.
Its time for small VM details here. There are
flush kernel processes, which main goal is to write your data from page cache to the disk. VM can be tuned to kick in this processes according to our demand.
The main issue here is that until data is written to disk, inode is locked. This means no further writes into given file is possible. In some cases reads are forbidden too – if page you want to read is not present in page cache, it has to be read from disk, and this process may lock inode too.
flush process completes you can not update data in the appropriate file. Thus main design goal is to split to-be-updated file and that one to be flushed to disk.
If we can make a prognosis on the amount of data written, we can tuned eblob to create new files frequently enough and tune VM to write old ones and do not touch currently being written.
Let’s suppose we write 1Kb objects with 20 krps rate. This is about 20 MB/s write speed. Let’s limit blob size to 1 Gb, this can be configured this way in elliptics config:
backend = blob blob_size = 1G
With 20 Mb/s write speed, new blob will be created roughly every 50 seconds.
Its time to tune VM now. We want
flush process to kick in frequently, but we do not want it to work with new data. Thus we want it to write (and lock) old blob files.
Let’s say 100-second-old data is ok to be written to disk. Here is a set of sysctls for good write behaviour.
vm.dirty_background_bytes = 0 vm.dirty_background_ratio = 75 vm.dirty_bytes = 0 vm.dirty_expire_centisecs = 10000 vm.dirty_ratio = 80 vm.dirty_writeback_centisecs = 50
vm.dirty_expire_centisecs says how old dirty (or written) data should be, so that
flush process pushed it to disk. It is measured in centiseconds, i.e. 1/100 of second. In example above it is 100 seconds.
vm.dirty_ratio draws a boundary (in percents) of what page cache can take before flush is forced. If page cache is more than 80% then process’ write will block flushing data to disk. If it is more than 75% background flush starts.
vm.dirty_writeback_centisecs says that
flush process should check dirty pages 2 times a second (50 centiseconds).
Above eblob + vm config and specified write load turns to behave the way we wanted – eblob creates new blob file every 50 seconds, 2 times per second kernel checks pages which are 100 seconds old (written 100 seconds ago) and flushes them to disk. In our example this will be ’2 blobs ago’ files.
Write CAS sucks per se, so Cassandra uses Paxos to sync replicas. This is slow and imposes huge latencies, but this is the right way to implement it.
To reduce amount of retries Elliptics starts using server-side atomic processing, which will have quite low latencies, but still it is possible to have multiple replicas mismatch, so write CAS will fail and reprocess.
It was a while I wrote last time, and there were enourmous amount of changes made.
Let’s start with write-cas and secondary indexes.
Write-cas stands for write-compare-and-swap, which means server performs your write only if your cookie matches what it has. We use data checksum as a cookie, but that can be changed if needed.
CAS semantic was introduced to heal a problem when multiple clients update the same key. In some cases you might want to update a complex structure and not to loose data made by others.
Since we do not have distributed locks, client performs read-modify-write loop without any lock being held, which ends up with the race against other clients. CAS allows to detect that data was changed after we read it last time and our modification will overwrite those chages.
In this case client gets error and performs read-modify-write loop again. There are nice function to use write-cas: with functor which just modifies data (read and write is hidden) and the one, and low-level method, which requires checksum and new data – if it fails, it is up to the client to read data again, apply his changes and write it again.
The first write-cas client is elliptics’ secondary indexes. You may attach ‘tags’ or ‘indexes’ with data to any key you want. This means that after secondary indexes update has completed, you will find your key under the ‘tag’ or ‘index’.
For example, tag can be a daily timestamp – in this case you will get list of all keys ‘changed’ at given day.
Of course you can find all keys which match multiple indexes – this is kind of ‘AND’ logical operation. Our API returns all keys found under/in all requested tags/indexes.
All indexes (as well as any keys) may live in different namespaces.
So far secondary indexes are implemented on pure write-cas semantic. And our rather simple realtime search engine Wookie uses them to update its reverse indexes.
And that doesn’t scale :)
We found that having multiple clients competing to update the same popular index ends up with too many read-modify-write cycles, that the whole system becomes network bound. And we want to be CPU bould instead.
To fix this issue we will change secondary indexes to be invoked from server-side engine. All server-side operations are atomic by default, so this will eliminate races between multiple clients.
Stay tuned – I will write more about Wookie soon.
After all, we want to implement a baby (really-really baby) Watson with it :)
It took too long, but finally it arrived
$ git commit -a
[master 5933af6] Changed API
12 files changed, 2178 insertions(+), 2193 deletions(-)
rewrite bindings/cpp/callback_p.h (64%)
Now C++ elliptics client library is fully asynchronous – there are virtually no runtime sync methods. Instead we return ioremap::elliptics::async_result, which user can use to wait, to get final results, to iterate over multiple replies and so on.
We also added corrected secondary indexes to C++ API. Use can store auxiliary data within indexes, for example if you build basic search engine, you may store token position in the index entry which corresponds to document ID.
Indexes are updated in parallel in different replicas using write-cas method. Although this may lead to some issues we are yet to discover, but there is no known system which is not built over ill-fated ‘write-to-master-broadcast-to-slaves’ design, which supports secondary indexes.
API changes are kudos to Ruslan Nigmatullin.
Not that in a month it will compete, of course not, but we do want to the end of the year to answer questions like ‘film where two guys with cancer move to the sea’.
We have following config for Elliptics leveldb backend:
sync = 0 root = /opt/elliptics/leveldb log = /var/log/elliptics/leveldb.log cache_size = 64424509440 write_buffer_size = 1073741824 block_size = 10485760 max_open_files = 10000 block_restart_interval = 16
And with 1kb of pretty compressable data (ascii strings) chunks pushed into single server with 128 Gb of RAM and 4 SATA disks combined into RAID10 ends up with poor 6-7 krps.
If request rate is about 20 krps median reply time is about 7 seconds (!)
dstat shows that it is not disk (well, with 20 krps it is disk), but before that it is neither CPU nor disk – leveldb just doesn’t allow more than 5-7 krps with 1 Kb data chunks from parallel threads (we have 8-64 IO threads depending on config). When snappy compression is enabled things get worse.
Is it ever possible to push 20 MB/s into LevelDB with small-to-medium (1Kb) chunks?
Multiple interfaces do not look like something complex to support in distributed storage, but it took us quite a while to get it right.
Mainly because Elliptics has a route table which is used by server and client nodes to get info about other nodes and to connect to them. If we increase this table, it means more connections, more stat/ping/heartbeat traffic, but mainly becase of confusion for client – what address should it use when multiple addresses have been received in route table? What if there is no route to some of them? What it some node has 3 addresses and others only one?
I decided to fix it the most confusing way ever possible. But yet very effective.
Let’s suppose there are 3 server nodes with the following addresses:
srv1: addr = 18.104.22.168:1025:2 10.10.10.10:1025:2 100.100.100.100:1025:2
srv2: addr = 22.214.171.124:1025:2 126.96.36.199:1025:2 188.8.131.52:1025:2
srv3: addr = 184.108.40.206:1025:2 220.127.116.11:1025:2 300.300.300.300:1025:2
Server node will be listening on 0.0.0.0 address actually, but its route table will contain appropriate above addresses.
When client connects to one of them, let’s say to srv2 – and client chose 18.104.22.168 address. Client doesn’t know which other addresses it received (above addresses is the whole route table which lives on every server node after they are connected to each other) should be used to connect to other nodes.
Should it be 22.214.171.124 for srv1 or 300.300.300.300 for srv3? If client connected to all addreses, which one should be used to actually send commands?
So we force admin to group addresses which belong to the same ‘logical route table’. So, when client connects to 126.96.36.199, it will receive whole route table, but will use 10.10.10.10 for srv1 and 188.8.131.52 for srv3.
How to achieve that? Route table ID must be a monotonically increased number, so above table should be rewritten as following:
srv1: addr = 184.108.40.206:1025:2-0 10.10.10.10:1025:2-1 100.100.100.100:1025:2-2
srv2: addr = 220.127.116.11:1025:2-0 18.104.22.168:1025:2-1 22.214.171.124:1025:2-2
srv3: addr = 126.96.36.199:1025:2-0 188.8.131.52:1025:2-1 300.300.300.300:1025:2-2
0, 1 and 2 above are those ‘logical route table’ ids.
When client connects to 10.10.10.10 he knows which addresses are in that ‘logical route table’, in this example it is 184.108.40.206 and 220.127.116.11. Client will connect to those addresses and will use them to send commands to the cluster.
If other client connects to address 18.104.22.168 in above example, it will receive whole route table, but will only use ‘logical route table’ with ID being equal to 2 (that’s what is written in srv2′s config for 22.214.171.124 address). Thus it will connect to 100.100.100.100 and 300.300.300.300.
This allows to use fast but lossy links for data recovery and somewhat slower, but reliable links for common client serving.
Or consider video streaming service – clients connect to many serving nodes using ‘external’ addresses, while internal interfaces (usually there are at least 2 of them on server motherboards) are used for compressed/processed/(re)encoded clips to be (re)uploaded from appropriate servers.
Available in 2.22 version.
$ cat /proc/buddyinfo
Node 0, zone DMA 0 1 1 1 1 1 1 0 1 1 3
Node 0, zone DMA32 416 420 367 372 330 300 251 223 175 139 304
Node 0, zone Normal 12587 0 0 0 0 0 0 0 0 0 1
Node 1, zone Normal 5789 48426 53853 17277 5661 1619 147 6152 5336 2296 1M
This is a fun situation, where virtually all memory on node0 is used and what’s free is totally fragmented, while all memory on the node1 is not used at all.
This likely happens because of CPU affinity of the processes which do heavy IO. Actually neither process has non-deafult affinity mask, which is -1 here (0xffffffff).
Load average is small – just 1-2, which means that only 1-2 processes are running at any single moment. And likely linux scheduler puts them always to the cores of the node0′s processor, so they only suck memory from node0 numa region, which leads to this heavy fragmentation.
This is 3.2.2-1 ubuntu kernel, and I do not really know what to do with it :)
Elliptics contains built-in LRU in-memory cache with timeouts. This is a rather demanded feature used in our most intensive workloads.
Elliptics distributed cache is more preferable than memcache and derivatives because of DHT IO balancing built-in, automatic reconnection and more user-friendliness, i.e. generally client doesn’t care about where to find given key – elliptics takes care itself to lookup, reconnect and rebalance.
But memory itself is used not only for temporal caches – clients want to have a persistent cache frequently, i.e. when data stored in such ‘cache’ can survive server outages. Cache IO performance should not be affected by real disk IO, instead some tricky schemes must be employed not to degrade speed.
Likely the most well-known storage for such workload is Redis. Whilst it has built-in (or implemented in client library) partitioning support (Redis Cluster is not production ready though) and beta-staged Sentinel – failover daemon, it is yet generally a single-master solution.
I thought of adding Redis backend into Elliptics – this provides automatic recovery (when new iterators are ready), failover, multiple replicas and so on. Given Redis performance we can safely drop own LRU cache implementation.