Class: Ruote::Worker

Inherits:
Object
  • Object
show all
Defined in:
lib/ruote/worker.rb

Overview

Workers fetch ‘msgs’ and ‘schedules’ from the storage and process them.

Read more at ruote.rubyforge.org/configuration.html

Defined Under Namespace

Classes: Info

Constant Summary collapse

EXP_ACTIONS =
%w[ reply cancel fail receive dispatched pause resume ]
PROC_ACTIONS =

‘apply’ is comprised in ‘launch’ ‘receive’ is a ParticipantExpression alias for ‘reply’

%w[ cancel kill pause resume ].collect { |a| a + '_process' }
DISP_ACTIONS =
%w[ dispatch dispatch_cancel dispatch_pause dispatch_resume ]

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(name, storage = nil) ⇒ Worker

Given a storage, creates a new instance of a Worker.



69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
# File 'lib/ruote/worker.rb', line 69

def initialize(name, storage=nil)

  if storage.nil?
    storage = name
    name = nil
  end

  @name = name || 'worker'

  if storage.respond_to?(:storage)
    @storage = storage.storage
    @context = storage.context
  else
    @storage = storage
    @context = Ruote::Context.new(storage)
  end

  service_name = @name
  service_name << '_worker' unless service_name.match(/worker$/)

  @context.add_service(service_name, self)

  @last_time = Time.at(0.0).utc # 1970...

  @state = 'running'
  @run_thread = nil
  @state_mutex = Mutex.new

  @msgs = []

  @sleep_time = @context['restless_worker'] ? nil : 0.000

  @info = @context['worker_info_enabled'] == false ? nil : Info.new(self)
end

Instance Attribute Details

#contextObject (readonly)

Returns the value of attribute context.



62
63
64
# File 'lib/ruote/worker.rb', line 62

def context
  @context
end

#nameObject (readonly)

Returns the value of attribute name.



59
60
61
# File 'lib/ruote/worker.rb', line 59

def name
  @name
end

#run_threadObject (readonly)

Returns the value of attribute run_thread.



65
66
67
# File 'lib/ruote/worker.rb', line 65

def run_thread
  @run_thread
end

#stateObject (readonly)

Returns the value of attribute state.



64
65
66
# File 'lib/ruote/worker.rb', line 64

def state
  @state
end

#storageObject (readonly)

Returns the value of attribute storage.



61
62
63
# File 'lib/ruote/worker.rb', line 61

def storage
  @storage
end

Instance Method Details

#inactive?Boolean

Returns true if the engine system is inactive, ie if all the process instances are terminated or are stuck in an error.

NOTE : for now, if a branch of a process is in error while another is still running, this method will consider the process instance inactive (and it will return true if all the processes are considered inactive).

Returns:

  • (Boolean)


149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
# File 'lib/ruote/worker.rb', line 149

def inactive?

  # the cheaper tests first

  return false if @msgs.size > 0
  return false unless @context.storage.empty?('schedules')
  return false unless @context.storage.empty?('msgs')

  wfids = @context.storage.get_many('expressions').collect { |exp|
    exp['fei']['wfid']
  }

  error_wfids = @context.storage.get_many('errors').collect { |err|
    err['fei']['wfid']
  }

  (wfids - error_wfids == [])
end

#joinObject

Joins the run thread of this worker (if there is no such thread, this method will return immediately, without any effect).



127
128
129
130
# File 'lib/ruote/worker.rb', line 127

def join

  @run_thread.join rescue nil
end

#runObject

Runs the worker in the current thread. See #run_in_thread for running in a dedicated thread.



107
108
109
110
# File 'lib/ruote/worker.rb', line 107

def run

  step while @state != 'stopped'
end

#run_in_threadObject

Triggers the run method of the worker in a dedicated thread.



114
115
116
117
118
119
120
121
122
# File 'lib/ruote/worker.rb', line 114

def run_in_thread

  #Thread.abort_on_exception = true

  @state = 'running'

  @run_thread = Thread.new { run }
  @run_thread['ruote_worker'] = self
end

#shutdownObject

Shuts down this worker (makes sure it won’t fetch further messages and schedules).



135
136
137
138
139
140
# File 'lib/ruote/worker.rb', line 135

def shutdown

  @state_mutex.synchronize { @state = 'stopped' }

  join
end