Class: Calvin::Scheduler

Inherits:
Object
  • Object
show all
Defined in:
lib/spinoza/calvin/scheduler.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(node: raise, n_threads: 4) ⇒ Scheduler

Returns a new instance of Scheduler.



17
18
19
20
21
22
23
24
25
26
27
28
29
30
# File 'lib/spinoza/calvin/scheduler.rb', line 17

def initialize node: raise, n_threads: 4
  @node = node

  @executors = n_threads.times.map {
    Calvin::Executor.new(
      store: node.store,
      readcaster: Calvin::Readcaster.new(node: node))}

  @idle_executors = @executors.dup
  @ex_for_txn = {}
  @work_queue = []
  
  node.meta_log.on_entry_available self, :handle_meta_log_entry
end

Instance Attribute Details

#ex_for_txnObject (readonly)

Maps { locally executing transaction => Executor }



12
13
14
# File 'lib/spinoza/calvin/scheduler.rb', line 12

def ex_for_txn
  @ex_for_txn
end

#executorsObject (readonly)

Returns the value of attribute executors.



8
9
10
# File 'lib/spinoza/calvin/scheduler.rb', line 8

def executors
  @executors
end

#idle_executorsObject (readonly)

Returns the value of attribute idle_executors.



9
10
11
# File 'lib/spinoza/calvin/scheduler.rb', line 9

def idle_executors
  @idle_executors
end

#nodeObject (readonly)

Returns the value of attribute node.



6
7
8
# File 'lib/spinoza/calvin/scheduler.rb', line 6

def node
  @node
end

#work_queueObject (readonly)

Transactions to be executed, in order.



15
16
17
# File 'lib/spinoza/calvin/scheduler.rb', line 15

def work_queue
  @work_queue
end

Instance Method Details

#finish_transaction(transaction, result) ⇒ Object



126
127
128
129
130
131
132
133
# File 'lib/spinoza/calvin/scheduler.rb', line 126

def finish_transaction transaction, result
  ex = ex_for_txn.delete(transaction)
  if ex
    idle_executors.push ex
  end
  node.lock_manager.unlock_all transaction
  node.finished_transaction transaction, result
end

#handle_meta_log_entry(id: raise, node: raise, value: raise) ⇒ Object



36
37
38
39
40
41
42
43
44
45
46
47
# File 'lib/spinoza/calvin/scheduler.rb', line 36

def handle_meta_log_entry id: raise, node: raise, value: raise
  batch_id = value
  batch = node.read_batch(batch_id)
  if batch
    work_queue.concat batch
    handle_next_transactions
  else
    # Log entry did not yet propagate to this node, even though MetaLog entry
    # did propagate. Won't happen with default latency settings.
    raise "TODO" ##
  end
end

#handle_next_transactionObject



74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
# File 'lib/spinoza/calvin/scheduler.rb', line 74

def handle_next_transaction
  ex = idle_executors.last
  txn = work_queue.first
  raise if ex_for_txn[txn]

  lock_succeeded = try_lock(txn)
  
  if lock_succeeded
    txn = work_queue.shift
    result = ex.execute_transaction(txn)
    if result
      finish_transaction txn, result
    else
      idle_executors.pop
      ex_for_txn[txn] = ex
    end

  else
    node.lock_manager.unlock_all txn
    # nothing to do until some executor finishes its current transaction
    ## TODO optimization: attempt to reorder another txn to the head
    ## of the work_queue where lock sets are disjoint.
  end

  lock_succeeded
end

#handle_next_transactionsObject



67
68
69
70
71
72
# File 'lib/spinoza/calvin/scheduler.rb', line 67

def handle_next_transactions
  until work_queue.empty? or idle_executors.empty?
    success = handle_next_transaction
    break unless success
  end
end

#inspectObject



32
33
34
# File 'lib/spinoza/calvin/scheduler.rb', line 32

def inspect
  "<#{self.class} on #{node.inspect}>"
end

#recv_peer_results(transaction: raise, table: raise, read_results: raise) ⇒ Object

Handle messages from peers. The only messages are the unidirectional broadcasts of read results.



51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
# File 'lib/spinoza/calvin/scheduler.rb', line 51

def recv_peer_results transaction: raise, table: raise, read_results: raise
  ex = ex_for_txn[transaction]
  if ex
    result = ex.recv_remote_reads table, read_results
    if result
      finish_transaction transaction, result
      handle_next_transactions
    end
  else
    ## TODO what if transaction hasn't started yet? Buffer? This won't
    ## happen with our simplistic latency assumptions.
    # The transaction has already finished locally, but another
    # node is still sending out read results.
  end
end

#try_lock(txn) ⇒ Object



101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
# File 'lib/spinoza/calvin/scheduler.rb', line 101

def try_lock txn
  lm = node.lock_manager
  rset = txn.read_set
  wset = txn.write_set

  # get write locks first, so r/w on same key doesn't fail
  wset.each do |table, keys|
    keys.each do |key|
      next if key == Spinoza::Transaction::INSERT_KEY
      lm.lock_write [table, key], txn
    end
  end

  rset.each do |table, keys|
    keys.each do |key|
      lm.lock_read [table, key], txn
    end
  end

  true

rescue Spinoza::LockManager::ConcurrencyError
  false
end