The first elliptics distributed network draft.
POHMELFS was created as a client (and besides that it is already a very high-performance network filesystem) for this kind of distributed network. It was called elliptics network.
Each node has ID in flat space with modulo operation in it.
Each object has ID from the same finite space. Object can only be identified by its id, there are no dirs, files or whatever else.
Server with ID N hosts objects from (N-1; N] where N-1 is ID of the neighbour node.
Any object can have multiple IDs, in this case it will be copied to different nodes.
Each additional ID can be obtained from the first one via known mathematical operations (like sum in finite field). IDs (including additional) should not cross for different objects.
Information about object and its clones will be stored in object itself and in objects, which correspond to the parent directory in classical path-based lookup, thus to get the object client will create ID from its path, fetch the data, and if it is not available, it can created ID from parent’s directory path and request information about clones from that object.
Each write operation is transaction based, thus each transaction is a separate object in the filesystem, which will be stored on different servers according to client’s settings (how many clones). Information about how to apply transactions on top of some object will be stored in the parent’s object the same way as described above.
Transaction is committed (and can be remoevd) after all clones have it applied, otherwise it should live in the network until explicitely removed.
Node join protocol.
Node looks up a pair of node, whose IDs surround ID of the joining node (called next and prev nodes here according to how IDs correlate). It sends a request to the next node to return list of objects (and its attributes) which correspond to the joining node ID. Let’s assume next node has ID N, prev node – P and joining node has id J.
1. Joining node J sends request to next node N to grab list of objects it hosts in (P; J] and to get routing table next node has.
1a. Next node forwards all write requests to the joining node J, which will block util step 3 is completed.
2. Joining node J runs through received list and selects objects which are newer than those which are presented on joining node itself.
3. Those objects are fetched from the next node.
3a. All write requests forwarded from the next node are applied.
4. Joining node connects to previous node P and announce that it is now in the network, so that previous node updated its routing table. This announce can be sent to all nodes in the routing table received from the next node in step 1.
Each node has a routing table, which corresponds to the tree indexed by node IDs. Each object in the tree hosts an address of the remote node. Even large enough routing table will not take lots of RAM, but this redundacy (and not only addresses of the immediate neighbours) allows to greatly reduce amount of lookup messages neeeded to find an appropriate node. When node receives some request which does not correspond to IDs it hosts, node should forward this request to another node according to its routing table. If routing table does not have a node, which corresponds to given ID, request should be forwarded to the nearest to that ID node.
Some heueristics should be introduced to determine when to stop a lookup and return no-entry error, for example when next node in the routing table has ID less than requested ID, then error should be returned. This relies on correct routing table, which should be updated when next node leaves, or nodes next to the next node join.
When node joins and fetches list of updated objects from the next node, it may notice that some objects were changed and request transaction list from another nodes to apply. Transaction list can be obtained from the object, which corresponds to parent directory in the path-based object name.
Comments?
POHMELFS cache updates initiated by the server. New release. POHMELFS and DST mail lists.
Comments are currently closed.

As suggested by Jon Smirl it is possible to several hash functions to put the same object over several nodes.
Also there should be a possibility to selectively put objects to the dedicated servers for higher bandwidth. Can be implemented as another hash function though.
Exciting project. I have many questions, I will post them separately.
“Information about object and its clones will be stored in object itself and in objects, which correspond to the parent directory in classical path-based lookup, thus to get the object client will create ID from its path, fetch the data, and if it is not available, it can created ID from parent’s directory path and request information about clones from that object.”
Since you appear to be missing a word here: stored in object itself and in objects, , perhaps “parent objects” or “clone objects”, I am not sure. Perhaps you mean “and in the object which corresponds to the parent directory in classical path-based lookup” . The parent makes more sense. If so, is there a mechanism that ensures that a parent object is never stored on the same server as the child since that would defeat the purpose of having the data in both places for redundancy?
Apparently problem with comma placement in english and russian languages :)
I meant that information about clones can be stored on the other objects, which can be located by using existing global path to the parent of the object. I.e. if we have object ‘/usr/local/bin/test’ then information about its clones will live in the objects with id ‘/usr/local/bin/’ (and its clones, which in turn can be located by using ID ‘/usr/local/’).
Of course not direct pathes are used, but some numerical representation (like hash) of that data.
Another approach as shown below, is to use different hashes to specify different clones.
Thus for different names (and thus for parent and object their full pathes will be different) there will be different IDs, so objects will likely be stored on different nodes.
“Each write operation is transaction based, thus each transaction is a separate object in the filesystem, which will be stored on different servers according to client’s settings (how many clones). Information about how to apply transactions on top of some object will be stored in the parent’s object the same way as described above.”
Let me see if I understand. So, if I modify file /a/b, bytes 21-30, this (the 10 bytes) will be stored as a transaction to object /a/b in its own object? And then /a/b’s parent, will have this info pointing to the transaction stored in it? But how is the parent modified to get this info? Does this require its own transaction, does this have to cascade all the way to the top each time for the lower transaction to complete? I think I don’t understand. :)
Transactions are supposed to be stored when they change the way how objects look. Writing data, adding/removing objects, they fall into that category, while reading, updating history and so on do not.
It is also possible to store history in the updated object itself, since it will be mirrored to multiple nodes, so that if at least one node with updated object is alive, it can tell others how to apply transactions to get valid data.
“Node looks up a pair of node, whose IDs surround ID of the joining node (called next and prev nodes here according to how IDs correlate). It sends a request to the next node to return list of objects (and its attributes) which correspond to the joining node ID. Let’s assume next node has ID N, prev node – P and joining node has id J.”
How does the joining node know it’s ID? What if it has never joined the network before? Is this for re-joining only, do new nodes go at the end? Does the node ID have anything to do with physical proximity, network proximity?
ID is determined by administrator or based on some stored data. Currently it is hash on some string assigned by administrator in the config file. Thus next and previous nodes are determined based on the nearest ID which is bigger or less than ID of the joining node.
ID can correspond to the network proximity by manually specifying number close to existing, but it is not a mandatory. If node joined the network previously it should (but not have to) use its old id, since it will not result in new IO when its objects will have to be moved to the different location and some new object will be copied to given node.
1. Joining node J sends request to next node N to grab list of objects it hosts in (P; J] and to get routing table next node has.
1a. Next node forwards all write requests to the joining node J, which will block util step 3 is completed.
2. Joining node J runs through received list and selects objects which are newer than those which are presented on joining node itself.
3. Those objects are fetched from the next node.
3a. All write requests forwarded from the next node are applied.
Perhaps steps 1, 2 and 3 should be performed once (or a variable number of times) without blocking
first so that any large updates which have occurred while the node was offline can be received in a
non-blocking fashion. Once this is done, repeat steps 1,2,3 while blocking, the blocking time should
then hopefully be greatly reduced!
Yes, it can be done this way too. I did not dig deeply into details yet and draw high-level design first.
“2. Joining node J runs through received list and selects objects which are newer than those which are presented on joining node itself.”
How is this determined, time? Or version #? There was a long discussion on this on the
glusterfs list. Using time has many bad issues. Using version #s can be tricky too since an object could potentially be deleted and recreated resetting the version count. One solution is a unique global sequence number as the version number.
Each object has ID based not only on its name but also its content, so objects with different content will have different history and it is possible to determine which one is older based on that. If objects were modified in parallel and before that they had the same ID, then we strike a collision and it is up to the administrator to decide how to apply stored transactions to get coherent data. This can be automated to some degreee though.
Thus for different names (and thus for parent and object their full pathes will be different) there will be different IDs, so objects will likely be stored on different nodes.
Hmm, I guess that depends on how large you expect your “cluster” to be. While I admire the desire for supporting large clusters, I can’t imagine this being too unlikely in very many cases. My personal objective is to have a two node cluster work well, in this case if the distribution is random, it would be a 50/50 chance. Add a few more nodes and you really haven’t reduced the possibility very much. I selfishly hope that you consider another added distribution method to prevent this.
Having multiple hashes (for example two hashes for two-node cluster) further reduce probability of having close objects on the same server, although increases data duplication in case IDs happend to be on the same node.
I think I will think about allowing administrator to setup hash function to use, so that in above example simple hash could be used, which will just add 0 and 1 at the begining of the string and node IDs could have appropriate digits.
Right now it is quite far goal though :)
I would be interested in your “Node Leaving Protocol”. Do the files it used to hold get redistributed (cloned I guess?) to other nodes automatically? Is it possible to know what it contained?
Yes, when node leaves the network all its files are redestributed (copied to the next node in above example).
If node crashes, then old versions (or clones) will be used to work with by clients.
I guess another way to approach this would be to make node IDs more dynamic. When a node joins, it would ask node N what it’s lowest onject ID is and choose a node ID number right below that. This way it is not actually responsible for any objects yet, but it can be online responding to a small subset of potential future objects. This would mean no blocking!
So, if node P is 1000 and node N is 2000, node J would ask node N for it’s lowest numbered object, say 1009. Node J would then choose 1008 as it’s entry point. It would now service anything between (above) 1000 and 1008. If new objects need to be created in that range it can become responsible for them right away. This would ease the burden of node joining since they would immediately help relieve other nodes as opposed to only being a burden to them while it syncs.
Once this node is online, it would adjust its node ID along with all the rest of the nodes. All the nodes could be in a constant dynamic node ID adjustement mode. The nodes could continuously rebalance the range of objects that they would be responsible for based on various other factors, disk space, bandwidth…
So, if node P was 1000, and was the lowest node, but node N was 2000 but there was another node at 3000 (the top most node). The spread would now be unbalanced: 0-1000, 1000-1008, 1008-2000, 2000-3000. So node P should slowly migrate towards 750 while node N should migrate towards 2250 and node J which started at 1008 should migrate to 1500. This would leave each node, now 4 of them a 750 (1/4th) range to cover as opposed to 1000 (1/3rd) when there were only 3 of them.
What is needed then is simply a migration protocol which would be used when nodes join/leave or simply need to rebalance things. The migration algorithm would determine which direction to move and the protocol would follow a simple one object at a time migration. So, node P (1000) would suddenly notice an inbalance and ask node J to take over its highest numbered object, say 995. Node P would then take node ID 994 and the migration is done (for one object). Node N also noticing the imbalance would ask node J to take over its next lowest object, ID 1009, once this is done node J’s ID would become 1009 and so forth until a balance is achieved.
If a node knows that it is going to leave a cluster, it could slowly migrate its two neighbors towards it so that all of its objects get taken over by them!
I hope this makes sense, perhaps I totally misunderstood things from the start and I am way off?
Having dynamic ID tends to introduce too much of a network IO especially in case of temporal node outage. But of course it should be possible to change node’s ID ‘on-demand’, i.e. for example when administrator detects that some segment of IDs is too dense, although it should not happen (too frequently at least) if good hash functions are used.