This page is just musings at this point, don't believe a word!'

See a discussion about this concept here: http://cassandra-user-incubator-apache-org.3065146.n2.nabble.com/Implementing-locks-using-cassandra-only-td5527076.html

Locking a row

The algorithm below is an adaptation of Lamport's Bakery algorithm, see Wikipedia and UC Davis. Lamport's algorithm solves the critical section problem for n processes in software. The basic idea is that of a bakery; customers take numbers, and whoever has the lowest number gets service next. Of course, "service" means entry to the critical section.

 1 var choosing: shared array[0..n-1] of boolean;
 2     number: shared array[0..n-1] of integer;          ...
 3 repeat
 4     choosing[i] := true;
 5     number[i] := max(number[0],number[1],...,number[n-1]) + 1;
 6     choosing[i] := false;
 7     for j := 0 to n-1 do begin
 8         while choosing[j] do (* nothing *);
 9         while number[j] <> 0 and (number[j], j) < (number[i],i) do
10              (* nothing *);
11     end;
12     (* critical section *)
13     number[i] := 0;
14     (* remainder section *)
15 until false;

Set-up:

  • The lock is represented by a row whose key is the lock_id
  • The row "has" a column family (CF) to represent the choosing array and a CF to represent the numbers picked, the row starts out empty
  • Each client has a unique client_id in the range 1..max_clients
  • All reads and writes must use consistency factor quorum unless otherwise noted, and any operation that fails must be repeated until it succeeds (i.e. blocking the client)

Algorithm:

Syntax: "write <lock_id, CF=number, client_id=client_number>" means: write to cassandra with row_key=lock_id and column client_id set to client_number in column family number.

 1. write <lock_id, CF=choosing, client_id=1>
 2. number_hash = read <lock_id, CF=number>
 3. client_number = max(number_hash.values) + 1
 4. write <lock_id, CF=number, client_id=client_number>
 5. delete <lock_id, CF=choosing, client_id=1>
 6. choosing_hash = read <lock_id, CF=choosing>
 7. number_hash = read <lock_id, CF=number>
 8. for id=1 to max_clients do
 9.     while choosing_hash[id] == 1 do
10.         sleep a little
11.         choosing_hash = read <lock_id, CF=choosing>
12.     end
13.     while number_hash[id].exists and number_hash[id] < client_number do
14.         sleep a little
15.         number_hash = read <lock_id, CF=number>
16.     end
17. end
18. /* critical section */
19. delete <lock_id, CF=number, client_id=client_number>

Some issues:

  • if a client crashes after setting its number the algorithm will eventually block all other clients
  • I'm not sure the consistency applies across CFs, if not, the number and choosing arrays need to be placed into the same CF

Distributed work queue

The distributed work queue is a further adaptation of Lamport's algorithm.

Set-up:

  • Each work item is represented by a row whose key is a unique timestamp produced by the client enqueueing the work item (this assumes reasonably synchronized clocks). The timestamp determines the (approximate) order in which work items are picked off the queue.
  • The row "has" a column family (CF) to represent the choosing array and a CF to represent the numbers picked.
  • Each worker has a unique worker_id in the range 1..max_workers.
  • All reads and writes must use consistency factor quorum unless otherwise noted, and any operation that fails must be repeated until it succeeds (i.e. blocking the worker), note that enqueuing a work item has no consistency factor constraints.
  • To enqueue a work item, a client does:
write <timestamp, CF=details, "some columns describing work item">
write <timestamp, CF=number, 0=0>

Algorithm:

loop do
  # pick a work item
  work_items_hash = scan <0.., CF=number>
  work_item_id = "lowest row key where number CF only has the 0=0 column"
  if work_item = nil then restart loop
  # try to get lock
  write <work_item, CF=choosing, worker_id=1>
  number_hash = read <work_item, CF=number>
  worker_number = max(number_hash.values) + 1
  write <work_item_id, CF=number, worker_id=worker_number>
  delete <work_item_id, CF=choosing, worker_id=1>
  choosing_hash = read <work_item_id, CF=choosing>
  number_hash = read <work_item_id, CF=number>
  for id=1 to max_workers do
    while choosing_hash[id] == 1 do
      sleep a little
      choosing_hash = read <lock_id, CF=choosing>
    end
    if number_hash[id].exists and number_hash[id] < worker_number do
      # oops, someone else is handling this work item
      delete <work_item_id, CF=number, worker_id=worker_number>
      restart outer loop
    end
  end
  # process work item
  work_details = read <work_item_id, CF=details>
  ...
  # remove work item
  delete <work_item_id, CF=details, *>
  delete <work_item_id, CF=number, 0=0>
  delete <work_item_id, CF=number, worker_id=worker_number>
end

Some issues:

  • need some back-off so there's not too much contention when many workers are idle
  • since items being worked on remain in the DB may need a smarter way to scan for a candidate work item
  • need to work in a timeout, i.e. workers should acquire a lease on a work item, not a lock, such that the work item becomes eligible again if the worker dies
  • No labels