This page is just musings at this point, don't believe a word!'
See a discussion about this concept here: http://cassandra-user-incubator-apache-org.3065146.n2.nabble.com/Implementing-locks-using-cassandra-only-td5527076.html
Locking a row
The algorithm below is an adaptation of Lamport's Bakery algorithm, see Wikipedia and UC Davis. Lamport's algorithm solves the critical section problem for n processes in software. The basic idea is that of a bakery; customers take numbers, and whoever has the lowest number gets service next. Of course, "service" means entry to the critical section.
1 var choosing: shared array[0..n-1] of boolean; 2 number: shared array[0..n-1] of integer; ... 3 repeat 4 choosing[i] := true; 5 number[i] := max(number[0],number[1],...,number[n-1]) + 1; 6 choosing[i] := false; 7 for j := 0 to n-1 do begin 8 while choosing[j] do (* nothing *); 9 while number[j] <> 0 and (number[j], j) < (number[i],i) do 10 (* nothing *); 11 end; 12 (* critical section *) 13 number[i] := 0; 14 (* remainder section *) 15 until false;
Set-up:
- The lock is represented by a row whose key is the lock_id
- The row "has" a column family (CF) to represent the choosing array and a CF to represent the numbers picked, the row starts out empty
- Each client has a unique client_id in the range 1..max_clients
- All reads and writes must use consistency factor quorum unless otherwise noted, and any operation that fails must be repeated until it succeeds (i.e. blocking the client)
Algorithm:
Syntax: "write <lock_id, CF=number, client_id=client_number>" means: write to cassandra with row_key=lock_id and column client_id set to client_number in column family number.
1. write <lock_id, CF=choosing, client_id=1> 2. number_hash = read <lock_id, CF=number> 3. client_number = max(number_hash.values) + 1 4. write <lock_id, CF=number, client_id=client_number> 5. delete <lock_id, CF=choosing, client_id=1> 6. choosing_hash = read <lock_id, CF=choosing> 7. number_hash = read <lock_id, CF=number> 8. for id=1 to max_clients do 9. while choosing_hash[id] == 1 do 10. sleep a little 11. choosing_hash = read <lock_id, CF=choosing> 12. end 13. while number_hash[id].exists and number_hash[id] < client_number do 14. sleep a little 15. number_hash = read <lock_id, CF=number> 16. end 17. end 18. /* critical section */ 19. delete <lock_id, CF=number, client_id=client_number>
Some issues:
- if a client crashes after setting its number the algorithm will eventually block all other clients
- I'm not sure the consistency applies across CFs, if not, the number and choosing arrays need to be placed into the same CF
Distributed work queue
The distributed work queue is a further adaptation of Lamport's algorithm.
Set-up:
- Each work item is represented by a row whose key is a unique timestamp produced by the client enqueueing the work item (this assumes reasonably synchronized clocks). The timestamp determines the (approximate) order in which work items are picked off the queue.
- The row "has" a column family (CF) to represent the choosing array and a CF to represent the numbers picked.
- Each worker has a unique worker_id in the range 1..max_workers.
- All reads and writes must use consistency factor quorum unless otherwise noted, and any operation that fails must be repeated until it succeeds (i.e. blocking the worker), note that enqueuing a work item has no consistency factor constraints.
- To enqueue a work item, a client does:
write <timestamp, CF=details, "some columns describing work item"> write <timestamp, CF=number, 0=0>
Algorithm:
loop do # pick a work item work_items_hash = scan <0.., CF=number> work_item_id = "lowest row key where number CF only has the 0=0 column" if work_item = nil then restart loop # try to get lock write <work_item, CF=choosing, worker_id=1> number_hash = read <work_item, CF=number> worker_number = max(number_hash.values) + 1 write <work_item_id, CF=number, worker_id=worker_number> delete <work_item_id, CF=choosing, worker_id=1> choosing_hash = read <work_item_id, CF=choosing> number_hash = read <work_item_id, CF=number> for id=1 to max_workers do while choosing_hash[id] == 1 do sleep a little choosing_hash = read <lock_id, CF=choosing> end if number_hash[id].exists and number_hash[id] < worker_number do # oops, someone else is handling this work item delete <work_item_id, CF=number, worker_id=worker_number> restart outer loop end end # process work item work_details = read <work_item_id, CF=details> ... # remove work item delete <work_item_id, CF=details, *> delete <work_item_id, CF=number, 0=0> delete <work_item_id, CF=number, worker_id=worker_number> end
Some issues:
- need some back-off so there's not too much contention when many workers are idle
- since items being worked on remain in the DB may need a smarter way to scan for a candidate work item
- need to work in a timeout, i.e. workers should acquire a lease on a work item, not a lock, such that the work item becomes eligible again if the worker dies