Class: Nestene::Actor::Core
- Inherits:
-
Object
- Object
- Nestene::Actor::Core
- Includes:
- Celluloid, Celluloid::Notifications
- Defined in:
- lib/nestene/actor/core.rb
Instance Method Summary collapse
- #auton_names ⇒ Object
- #cancel_delayed_step(auton_id, step_id) ⇒ Object
- #create_auton(type, auton_id = SecureRandom.uuid) ⇒ Object
- #get_credentials ⇒ Object
- #get_state(auton_id) ⇒ Object
-
#initialize(storage) ⇒ Core
constructor
A new instance of Core.
- #notify_waiters(topic, auton_id, state) ⇒ Object
- #repeat_step(auton_id, method_uuid) ⇒ Object
- #resume(auton_id) ⇒ Object
- #schedule_delayed_step(auton_id, delay, name, parameters = []) ⇒ Object
- #schedule_repeating_delayed_step(auton_id, every, delay, name, parameters = []) ⇒ Object
- #schedule_step(auton_id, step_name, parameters = [], callback_auton_id = nil, callback_method = nil) ⇒ Object
- #set_credentials(credentials) ⇒ Object
- #wait_for_execution_result(auton_id, method_uuid) ⇒ Object
Constructor Details
#initialize(storage) ⇒ Core
Returns a new instance of Core.
10 11 12 13 14 15 16 17 |
# File 'lib/nestene/actor/core.rb', line 10 def initialize(storage) subscribe('state_update', :notify_waiters) @execution_futures={} @storage = storage storage.list.each do |auton_id| Celluloid::Actor["storage:%s" % auton_id] = AutonStorage.new(auton_id, @storage) end end |
Instance Method Details
#auton_names ⇒ Object
19 20 21 |
# File 'lib/nestene/actor/core.rb', line 19 def auton_names @storage.list end |
#cancel_delayed_step(auton_id, step_id) ⇒ Object
63 64 65 66 67 |
# File 'lib/nestene/actor/core.rb', line 63 def cancel_delayed_step auton_id, step_id Celluloid::Actor["storage:%s" % auton_id].update do |state| delayed = state.queue.remove_delayed_method step_id end end |
#create_auton(type, auton_id = SecureRandom.uuid) ⇒ Object
31 32 33 34 35 36 37 38 |
# File 'lib/nestene/actor/core.rb', line 31 def create_auton type, auton_id=SecureRandom.uuid storage = AutonStorage.new(auton_id, @storage) Celluloid::Actor["storage:%s" % auton_id] = storage storage.create(type) auton_id rescue Exception => e abort e end |
#get_credentials ⇒ Object
23 24 25 |
# File 'lib/nestene/actor/core.rb', line 23 def get_credentials @storage.load('__credentials__') || {} end |
#get_state(auton_id) ⇒ Object
40 41 42 43 44 |
# File 'lib/nestene/actor/core.rb', line 40 def get_state auton_id Celluloid::Actor["storage:%s" % auton_id].get rescue Exception => e abort e end |
#notify_waiters(topic, auton_id, state) ⇒ Object
143 144 145 146 |
# File 'lib/nestene/actor/core.rb', line 143 def notify_waiters topic, auton_id, state future = @execution_futures.delete(auton_id) future.signal(SelfValue.new) if future end |
#repeat_step(auton_id, method_uuid) ⇒ Object
84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 |
# File 'lib/nestene/actor/core.rb', line 84 def repeat_step auton_id, method_uuid step_id = SecureRandom.uuid Celluloid::Actor["storage:%s" % auton_id].update do |state| original_step = state.queue.executed.find{|m| m.uuid == method_uuid} method = ScheduledMethod.new method.name = original_step.name method.parameters = original_step.parameters method.uuid = step_id state.queue.to_execute.unshift method state.queue.failed = false end step_id end |
#resume(auton_id) ⇒ Object
100 101 102 103 104 |
# File 'lib/nestene/actor/core.rb', line 100 def resume auton_id Celluloid::Actor["storage:%s" % auton_id].update do |state| state.queue.failed = false end end |
#schedule_delayed_step(auton_id, delay, name, parameters = []) ⇒ Object
69 70 71 72 73 74 |
# File 'lib/nestene/actor/core.rb', line 69 def schedule_delayed_step auton_id, delay , name, parameters=[] Celluloid::Actor["storage:%s" % auton_id].update do |state| delayed = state.queue.add_delayed_method name, parameters, delay delayed.uuid end end |
#schedule_repeating_delayed_step(auton_id, every, delay, name, parameters = []) ⇒ Object
77 78 79 80 81 82 |
# File 'lib/nestene/actor/core.rb', line 77 def schedule_repeating_delayed_step auton_id, every, delay, name, parameters=[] Celluloid::Actor["storage:%s" % auton_id].update do |state| delayed = state.queue.add_delayed_method name, parameters, delay, every delayed.uuid end end |
#schedule_step(auton_id, step_name, parameters = [], callback_auton_id = nil, callback_method = nil) ⇒ Object
46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 |
# File 'lib/nestene/actor/core.rb', line 46 def schedule_step auton_id, step_name, parameters=[], callback_auton_id=nil, callback_method=nil step_id = SecureRandom.uuid Celluloid::Actor["storage:%s" % auton_id].update do |state| method = ScheduledMethod.new method.name = step_name method.parameters = parameters method.uuid = step_id if callback_auton_id && callback_method method.callback = Callback.new method.callback.name = callback_method method.callback.auton_id = callback_auton_id end state.queue.to_execute << method end step_id end |
#set_credentials(credentials) ⇒ Object
27 28 29 |
# File 'lib/nestene/actor/core.rb', line 27 def set_credentials credentials @storage.store('__credentials__',credentials) end |
#wait_for_execution_result(auton_id, method_uuid) ⇒ Object
107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 |
# File 'lib/nestene/actor/core.rb', line 107 def wait_for_execution_result auton_id, method_uuid future = nil executed = nil exclusive do state = Celluloid::Actor["storage:%s" % auton_id].get unless @execution_futures.has_key?(auton_id) @execution_futures[auton_id] = Celluloid::Future.new end future = @execution_futures[auton_id] executed = state ? state.queue.executed.find{|m| m.uuid == method_uuid} : nil end until executed executed = future.value state = Celluloid::Actor["storage:%s" % auton_id].get executed = state.queue.executed.find{|m| m.uuid == method_uuid} end if executed.error abort executed.error else return executed.result end end |