Class: Calvin::Scheduler
- Inherits:
-
Object
- Object
- Calvin::Scheduler
- Defined in:
- lib/spinoza/calvin/scheduler.rb
Instance Attribute Summary collapse
-
#ex_for_txn ⇒ Object
readonly
Maps { locally executing transaction => Executor }.
-
#executors ⇒ Object
readonly
Returns the value of attribute executors.
-
#idle_executors ⇒ Object
readonly
Returns the value of attribute idle_executors.
-
#node ⇒ Object
readonly
Returns the value of attribute node.
-
#work_queue ⇒ Object
readonly
Transactions to be executed, in order.
Instance Method Summary collapse
- #finish_transaction(transaction, result) ⇒ Object
- #handle_meta_log_entry(id: raise, node: raise, value: raise) ⇒ Object
- #handle_next_transaction ⇒ Object
- #handle_next_transactions ⇒ Object
-
#initialize(node: raise, n_threads: 4) ⇒ Scheduler
constructor
A new instance of Scheduler.
- #inspect ⇒ Object
-
#recv_peer_results(transaction: raise, table: raise, read_results: raise) ⇒ Object
Handle messages from peers.
- #try_lock(txn) ⇒ Object
Constructor Details
#initialize(node: raise, n_threads: 4) ⇒ Scheduler
Returns a new instance of Scheduler.
17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
# File 'lib/spinoza/calvin/scheduler.rb', line 17 def initialize node: raise, n_threads: 4 @node = node @executors = n_threads.times.map { Calvin::Executor.new( store: node.store, readcaster: Calvin::Readcaster.new(node: node))} @idle_executors = @executors.dup @ex_for_txn = {} @work_queue = [] node..on_entry_available self, :handle_meta_log_entry end |
Instance Attribute Details
#ex_for_txn ⇒ Object (readonly)
Maps { locally executing transaction => Executor }
12 13 14 |
# File 'lib/spinoza/calvin/scheduler.rb', line 12 def ex_for_txn @ex_for_txn end |
#executors ⇒ Object (readonly)
Returns the value of attribute executors.
8 9 10 |
# File 'lib/spinoza/calvin/scheduler.rb', line 8 def executors @executors end |
#idle_executors ⇒ Object (readonly)
Returns the value of attribute idle_executors.
9 10 11 |
# File 'lib/spinoza/calvin/scheduler.rb', line 9 def idle_executors @idle_executors end |
#node ⇒ Object (readonly)
Returns the value of attribute node.
6 7 8 |
# File 'lib/spinoza/calvin/scheduler.rb', line 6 def node @node end |
#work_queue ⇒ Object (readonly)
Transactions to be executed, in order.
15 16 17 |
# File 'lib/spinoza/calvin/scheduler.rb', line 15 def work_queue @work_queue end |
Instance Method Details
#finish_transaction(transaction, result) ⇒ Object
126 127 128 129 130 131 132 133 |
# File 'lib/spinoza/calvin/scheduler.rb', line 126 def finish_transaction transaction, result ex = ex_for_txn.delete(transaction) if ex idle_executors.push ex end node.lock_manager.unlock_all transaction node.finished_transaction transaction, result end |
#handle_meta_log_entry(id: raise, node: raise, value: raise) ⇒ Object
36 37 38 39 40 41 42 43 44 45 46 47 |
# File 'lib/spinoza/calvin/scheduler.rb', line 36 def id: raise, node: raise, value: raise batch_id = value batch = node.read_batch(batch_id) if batch work_queue.concat batch handle_next_transactions else # Log entry did not yet propagate to this node, even though MetaLog entry # did propagate. Won't happen with default latency settings. raise "TODO" ## end end |
#handle_next_transaction ⇒ Object
74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 |
# File 'lib/spinoza/calvin/scheduler.rb', line 74 def handle_next_transaction ex = idle_executors.last txn = work_queue.first raise if ex_for_txn[txn] lock_succeeded = try_lock(txn) if lock_succeeded txn = work_queue.shift result = ex.execute_transaction(txn) if result finish_transaction txn, result else idle_executors.pop ex_for_txn[txn] = ex end else node.lock_manager.unlock_all txn # nothing to do until some executor finishes its current transaction ## TODO optimization: attempt to reorder another txn to the head ## of the work_queue where lock sets are disjoint. end lock_succeeded end |
#handle_next_transactions ⇒ Object
67 68 69 70 71 72 |
# File 'lib/spinoza/calvin/scheduler.rb', line 67 def handle_next_transactions until work_queue.empty? or idle_executors.empty? success = handle_next_transaction break unless success end end |
#inspect ⇒ Object
32 33 34 |
# File 'lib/spinoza/calvin/scheduler.rb', line 32 def inspect "<#{self.class} on #{node.inspect}>" end |
#recv_peer_results(transaction: raise, table: raise, read_results: raise) ⇒ Object
Handle messages from peers. The only messages are the unidirectional broadcasts of read results.
51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 |
# File 'lib/spinoza/calvin/scheduler.rb', line 51 def recv_peer_results transaction: raise, table: raise, read_results: raise ex = ex_for_txn[transaction] if ex result = ex.recv_remote_reads table, read_results if result finish_transaction transaction, result handle_next_transactions end else ## TODO what if transaction hasn't started yet? Buffer? This won't ## happen with our simplistic latency assumptions. # The transaction has already finished locally, but another # node is still sending out read results. end end |
#try_lock(txn) ⇒ Object
101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 |
# File 'lib/spinoza/calvin/scheduler.rb', line 101 def try_lock txn lm = node.lock_manager rset = txn.read_set wset = txn.write_set # get write locks first, so r/w on same key doesn't fail wset.each do |table, keys| keys.each do |key| next if key == Spinoza::Transaction::INSERT_KEY lm.lock_write [table, key], txn end end rset.each do |table, keys| keys.each do |key| lm.lock_read [table, key], txn end end true rescue Spinoza::LockManager::ConcurrencyError false end |