Class: Calvin::Executor

Inherits:
Object
  • Object
show all
Defined in:
lib/spinoza/calvin/executor.rb

Overview

Represents the work performed in one thread. The scheduler assigns a sequence of transactions to each of several executors. The executor handles the transactions one at a time, in a series of substeps as data is received from peers. Within an executor, the sequence of transactions and substeps is totally ordered wrt the global timeline, but the sequences of two Executors may interleave, which is how Calvin achieves some write concurrency.

Does not have access to any subsystems except the node’s Store and communication with peer executors via the readcasters.

Defined Under Namespace

Classes: StateError, Task

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(store: nil, readcaster: nil) ⇒ Executor

Returns a new instance of Executor.



38
39
40
41
42
# File 'lib/spinoza/calvin/executor.rb', line 38

def initialize store: nil, readcaster: nil
  @store = store
  @readcaster = readcaster
  ready!
end

Instance Attribute Details

#readcasterObject (readonly)

Returns the value of attribute readcaster.



15
16
17
# File 'lib/spinoza/calvin/executor.rb', line 15

def readcaster
  @readcaster
end

#storeObject (readonly)

Returns the value of attribute store.



14
15
16
# File 'lib/spinoza/calvin/executor.rb', line 14

def store
  @store
end

#taskObject (readonly)

Returns the value of attribute task.



16
17
18
# File 'lib/spinoza/calvin/executor.rb', line 16

def task
  @task
end

Instance Method Details

#all_reads_are_local?(txn) ⇒ Boolean

Returns:

  • (Boolean)


88
89
90
# File 'lib/spinoza/calvin/executor.rb', line 88

def all_reads_are_local? txn
  txn.all_reads_are_local? store
end

#assert_ready?Boolean

Returns:

  • (Boolean)


52
53
54
55
56
# File 'lib/spinoza/calvin/executor.rb', line 52

def assert_ready?
  unless ready?
    raise StateError, "cannot start new task -- already executing #{task}"
  end
end

#execute_transaction(txn) ⇒ Object

Assumes all locks are held around this call.



59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
# File 'lib/spinoza/calvin/executor.rb', line 59

def execute_transaction txn
  assert_ready?

  local_read_results = @readcaster.execute_local_reads txn
  @readcaster.serve_reads txn, local_read_results

  if passive? txn
    result = local_read_results
    ready!

  elsif all_reads_are_local? txn
    result = local_read_results
    store.execute *txn.all_write_ops
    ready!

  else
    @task = Task.new txn,
      read_results: local_read_results,
      remote_read_tables: txn.remote_read_tables(store)
    result = false
  end

  return result
end

#passive?(txn) ⇒ Boolean

Returns:

  • (Boolean)


84
85
86
# File 'lib/spinoza/calvin/executor.rb', line 84

def passive? txn
  not txn.active? store
end

#ready!Object



44
45
46
# File 'lib/spinoza/calvin/executor.rb', line 44

def ready!
  @task = nil
end

#ready?Boolean

Returns:

  • (Boolean)


48
49
50
# File 'lib/spinoza/calvin/executor.rb', line 48

def ready?
  @task.nil?
end

#recv_remote_reads(table, read_results) ⇒ Object

Assumes all locks are held around this call.



93
94
95
96
97
98
99
100
101
102
103
104
105
106
# File 'lib/spinoza/calvin/executor.rb', line 93

def recv_remote_reads table, read_results
  if task.remote_read_tables.include? table
    task.remote_read_tables.delete table
    task.read_results.concat read_results
  # else this is a redundant message for this table, so ignore it
  end

  return false unless task.remote_read_tables.empty?

  store.execute *task.txn.all_write_ops
  result = task.read_results
  ready!
  result
end