Class: Ruote::Worker

Inherits:
Object
  • Object
show all
Defined in:
lib/ruote/worker.rb

Overview

Workers fetch 'msgs' and 'schedules' from the storage and process them.

Read more at ruote.rubyforge.org/configuration.html

Defined Under Namespace

Classes: Info

Constant Summary

EXP_ACTIONS =
%w[ reply cancel fail receive dispatched pause resume ]
PROC_ACTIONS =

'apply' is comprised in 'launch' 'receive' is a ParticipantExpression alias for 'reply'

%w[ cancel kill pause resume ].collect { |a| a + '_process' }
DISP_ACTIONS =
%w[ dispatch dispatch_cancel dispatch_pause dispatch_resume ]

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(name, storage = nil) ⇒ Worker

Given a storage, creates a new instance of a Worker.



69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
# File 'lib/ruote/worker.rb', line 69

def initialize(name, storage=nil)

  if storage.nil?
    storage = name
    name = nil
  end

  @name = name || 'worker'

  if storage.respond_to?(:storage)
    @storage = storage.storage
    @context = storage.context
  else
    @storage = storage
    @context = Ruote::Context.new(storage)
  end

  service_name = @name
  service_name << '_worker' unless service_name.match(/worker$/)

  @context.add_service(service_name, self)

  @last_time = Time.at(0.0).utc # 1970...

  @state = 'running'
  @run_thread = nil
  @state_mutex = Mutex.new

  @msgs = []

  @sleep_time = @context['restless_worker'] ? nil : 0.000

  @info = @context['worker_info_enabled'] == false ? nil : Info.new(self)
end

Instance Attribute Details

#contextObject (readonly)

Returns the value of attribute context



62
63
64
# File 'lib/ruote/worker.rb', line 62

def context
  @context
end

#nameObject (readonly)

Returns the value of attribute name



59
60
61
# File 'lib/ruote/worker.rb', line 59

def name
  @name
end

#run_threadObject (readonly)

Returns the value of attribute run_thread



65
66
67
# File 'lib/ruote/worker.rb', line 65

def run_thread
  @run_thread
end

#stateObject (readonly)

Returns the value of attribute state



64
65
66
# File 'lib/ruote/worker.rb', line 64

def state
  @state
end

#storageObject (readonly)

Returns the value of attribute storage



61
62
63
# File 'lib/ruote/worker.rb', line 61

def storage
  @storage
end

Instance Method Details

#inactive?Boolean

Returns true if the engine system is inactive, ie if all the process instances are terminated or are stuck in an error.

NOTE : for now, if a branch of a process is in error while another is still running, this method will consider the process instance inactive (and it will return true if all the processes are considered inactive).



149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
# File 'lib/ruote/worker.rb', line 149

def inactive?

  # the cheaper tests first

  return false if @msgs.size > 0
  return false unless @context.storage.empty?('schedules')
  return false unless @context.storage.empty?('msgs')

  wfids = @context.storage.get_many('expressions').collect { |exp|
    exp['fei']['wfid']
  }

  error_wfids = @context.storage.get_many('errors').collect { |err|
    err['fei']['wfid']
  }

  (wfids - error_wfids == [])
end

#joinObject

Joins the run thread of this worker (if there is no such thread, this method will return immediately, without any effect).



127
128
129
130
# File 'lib/ruote/worker.rb', line 127

def join

  @run_thread.join rescue nil
end

#runObject

Runs the worker in the current thread. See #run_in_thread for running in a dedicated thread.



107
108
109
110
# File 'lib/ruote/worker.rb', line 107

def run

  step while @state != 'stopped'
end

#run_in_threadObject

Triggers the run method of the worker in a dedicated thread.



114
115
116
117
118
119
120
121
122
# File 'lib/ruote/worker.rb', line 114

def run_in_thread

  #Thread.abort_on_exception = true

  @state = 'running'

  @run_thread = Thread.new { run }
  @run_thread['ruote_worker'] = self
end

#shutdownObject

Shuts down this worker (makes sure it won't fetch further messages and schedules).



135
136
137
138
139
140
# File 'lib/ruote/worker.rb', line 135

def shutdown

  @state_mutex.synchronize { @state = 'stopped' }

  join
end