Class: Ruote::Worker
- Inherits:
-
Object
- Object
- Ruote::Worker
- Defined in:
- lib/ruote/worker.rb
Overview
Workers fetch ‘msgs’ and ‘schedules’ from the storage and process them.
Read more at ruote.rubyforge.org/configuration.html
Defined Under Namespace
Classes: Info
Constant Summary collapse
- EXP_ACTIONS =
%w[ reply cancel fail receive dispatched pause resume ]
- PROC_ACTIONS =
‘apply’ is comprised in ‘launch’ ‘receive’ is a ParticipantExpression alias for ‘reply’
%w[ cancel kill pause resume ].collect { |a| a + '_process' }
- DISP_ACTIONS =
%w[ dispatch dispatch_cancel dispatch_pause dispatch_resume ]
Instance Attribute Summary collapse
-
#context ⇒ Object
readonly
Returns the value of attribute context.
-
#name ⇒ Object
readonly
Returns the value of attribute name.
-
#run_thread ⇒ Object
readonly
Returns the value of attribute run_thread.
-
#state ⇒ Object
readonly
Returns the value of attribute state.
-
#storage ⇒ Object
readonly
Returns the value of attribute storage.
Instance Method Summary collapse
-
#inactive? ⇒ Boolean
Returns true if the engine system is inactive, ie if all the process instances are terminated or are stuck in an error.
-
#initialize(name, storage = nil) ⇒ Worker
constructor
Given a storage, creates a new instance of a Worker.
-
#join ⇒ Object
Joins the run thread of this worker (if there is no such thread, this method will return immediately, without any effect).
-
#run ⇒ Object
Runs the worker in the current thread.
-
#run_in_thread ⇒ Object
Triggers the run method of the worker in a dedicated thread.
-
#shutdown ⇒ Object
Shuts down this worker (makes sure it won’t fetch further messages and schedules).
Constructor Details
#initialize(name, storage = nil) ⇒ Worker
Given a storage, creates a new instance of a Worker.
69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 |
# File 'lib/ruote/worker.rb', line 69 def initialize(name, storage=nil) if storage.nil? storage = name name = nil end @name = name || 'worker' if storage.respond_to?(:storage) @storage = storage.storage @context = storage.context else @storage = storage @context = Ruote::Context.new(storage) end service_name = @name service_name << '_worker' unless service_name.match(/worker$/) @context.add_service(service_name, self) @last_time = Time.at(0.0).utc # 1970... @state = 'running' @run_thread = nil @state_mutex = Mutex.new @msgs = [] @sleep_time = @context['restless_worker'] ? nil : 0.000 @info = @context['worker_info_enabled'] == false ? nil : Info.new(self) end |
Instance Attribute Details
#context ⇒ Object (readonly)
Returns the value of attribute context.
62 63 64 |
# File 'lib/ruote/worker.rb', line 62 def context @context end |
#name ⇒ Object (readonly)
Returns the value of attribute name.
59 60 61 |
# File 'lib/ruote/worker.rb', line 59 def name @name end |
#run_thread ⇒ Object (readonly)
Returns the value of attribute run_thread.
65 66 67 |
# File 'lib/ruote/worker.rb', line 65 def run_thread @run_thread end |
#state ⇒ Object (readonly)
Returns the value of attribute state.
64 65 66 |
# File 'lib/ruote/worker.rb', line 64 def state @state end |
#storage ⇒ Object (readonly)
Returns the value of attribute storage.
61 62 63 |
# File 'lib/ruote/worker.rb', line 61 def storage @storage end |
Instance Method Details
#inactive? ⇒ Boolean
Returns true if the engine system is inactive, ie if all the process instances are terminated or are stuck in an error.
NOTE : for now, if a branch of a process is in error while another is still running, this method will consider the process instance inactive (and it will return true if all the processes are considered inactive).
149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 |
# File 'lib/ruote/worker.rb', line 149 def inactive? # the cheaper tests first return false if @msgs.size > 0 return false unless @context.storage.empty?('schedules') return false unless @context.storage.empty?('msgs') wfids = @context.storage.get_many('expressions').collect { |exp| exp['fei']['wfid'] } error_wfids = @context.storage.get_many('errors').collect { |err| err['fei']['wfid'] } (wfids - error_wfids == []) end |
#join ⇒ Object
Joins the run thread of this worker (if there is no such thread, this method will return immediately, without any effect).
127 128 129 130 |
# File 'lib/ruote/worker.rb', line 127 def join @run_thread.join rescue nil end |
#run ⇒ Object
Runs the worker in the current thread. See #run_in_thread for running in a dedicated thread.
107 108 109 110 |
# File 'lib/ruote/worker.rb', line 107 def run step while @state != 'stopped' end |
#run_in_thread ⇒ Object
Triggers the run method of the worker in a dedicated thread.
114 115 116 117 118 119 120 121 122 |
# File 'lib/ruote/worker.rb', line 114 def run_in_thread #Thread.abort_on_exception = true @state = 'running' @run_thread = Thread.new { run } @run_thread['ruote_worker'] = self end |
#shutdown ⇒ Object
Shuts down this worker (makes sure it won’t fetch further messages and schedules).
135 136 137 138 139 140 |
# File 'lib/ruote/worker.rb', line 135 def shutdown @state_mutex.synchronize { @state = 'stopped' } join end |