Class: Faktory::Processor
- Inherits:
-
Object
- Object
- Faktory::Processor
show all
- Includes:
- Util
- Defined in:
- lib/faktory/processor.rb
Overview
The Processor is a standalone thread which:
-
fetches a job
-
executes the job
a. instantiate the Worker
b. run the middleware chain
c. call
A Processor can exit due to shutdown (processor_stopped) or due to an error during job execution (processor_died)
If an error occurs in the job execution, the Processor calls the Manager to create a new one to replace itself and exits.
Constant Summary
collapse
- @@busy_lock =
Mutex.new
- @@busy_count =
0
Constants included
from Util
Util::EXPIRY
Instance Attribute Summary collapse
Class Method Summary
collapse
Instance Method Summary
collapse
Methods included from Util
#fire_event, #hostname, #identity, #logger, #process_nonce, #safe_thread, #server, #watchdog
#handle_exception
Constructor Details
#initialize(mgr) ⇒ Processor
Returns a new instance of Processor.
37
38
39
40
41
42
43
44
45
|
# File 'lib/faktory/processor.rb', line 37
def initialize(mgr)
@mgr = mgr
@down = false
@done = false
@thread = nil
@reloader = mgr.options[:reloader]
@logging = (mgr.options[:job_logger] || Faktory::JobLogger).new
@fetcher = Faktory::Fetcher.new(mgr.options)
end
|
Instance Attribute Details
#job ⇒ Object
Returns the value of attribute job.
29
30
31
|
# File 'lib/faktory/processor.rb', line 29
def job
@job
end
|
#thread ⇒ Object
Returns the value of attribute thread.
28
29
30
|
# File 'lib/faktory/processor.rb', line 28
def thread
@thread
end
|
Class Method Details
.busy_count ⇒ Object
33
34
35
|
# File 'lib/faktory/processor.rb', line 33
def self.busy_count
@@busy_count
end
|
Instance Method Details
#constantize(str) ⇒ Object
168
169
170
171
172
173
174
175
|
# File 'lib/faktory/processor.rb', line 168
def constantize(str)
names = str.split('::')
names.shift if names.empty? || names.first.empty?
names.inject(Object) do |constant, name|
constant.const_defined?(name) ? constant.const_get(name) : constant.const_missing(name)
end
end
|
#dispatch(payload) ⇒ Object
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
|
# File 'lib/faktory/processor.rb', line 126
def dispatch(payload)
Faktory::Logging.with_job_hash_context(payload) do
@logging.call(payload) do
@reloader.call do
klass = constantize(payload['jobtype'.freeze])
jobinst = klass.new
jobinst.jid = payload['jid'.freeze]
yield jobinst
end
end
end
end
|
#fetch ⇒ Object
103
104
105
106
107
108
109
110
111
112
|
# File 'lib/faktory/processor.rb', line 103
def fetch
begin
work = @fetcher.retrieve_work
(logger.info { "Faktory is online, #{Time.now - @down} sec downtime" }; @down = nil) if @down
work
rescue Faktory::Shutdown
rescue => ex
handle_fetch_exception(ex)
end
end
|
#handle_fetch_exception(ex) ⇒ Object
114
115
116
117
118
119
120
121
122
123
124
|
# File 'lib/faktory/processor.rb', line 114
def handle_fetch_exception(ex)
if !@down
@down = Time.now
logger.error("Error fetching job: #{ex}")
ex.backtrace.each do |bt|
logger.error(bt)
end
end
sleep(1)
nil
end
|
#kill(wait = false) ⇒ Object
53
54
55
56
57
58
59
60
61
62
63
|
# File 'lib/faktory/processor.rb', line 53
def kill(wait=false)
@done = true
return if !@thread
@thread.raise ::Faktory::Shutdown
@thread.value if wait
end
|
#process(work) ⇒ Object
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
|
# File 'lib/faktory/processor.rb', line 143
def process(work)
payload = work.job
begin
dispatch(payload) do |jobinst|
Faktory.worker_middleware.invoke(jobinst, payload) do
jobinst.perform(*payload['args'.freeze])
end
end
work.acknowledge
rescue Faktory::Shutdown => shut
work.fail(shut)
rescue Exception => ex
handle_exception(ex, { :context => "Job raised exception", :job => work.job })
work.fail(ex)
raise ex
end
end
|
#process_one ⇒ Object
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
|
# File 'lib/faktory/processor.rb', line 84
def process_one
work = fetch
if work
@@busy_lock.synchronize do
@@busy_count = @@busy_count + 1
end
begin
@job = work.job
process(work)
ensure
@@busy_lock.synchronize do
@@busy_count = @@busy_count - 1
end
end
else
sleep 1
end
end
|
#run ⇒ Object
71
72
73
74
75
76
77
78
79
80
81
82
|
# File 'lib/faktory/processor.rb', line 71
def run
begin
while !@done
process_one
end
@mgr.processor_stopped(self)
rescue Faktory::Shutdown
@mgr.processor_stopped(self)
rescue Exception => ex
@mgr.processor_died(self, ex)
end
end
|
#start ⇒ Object
65
66
67
|
# File 'lib/faktory/processor.rb', line 65
def start
@thread ||= safe_thread("processor", &method(:run))
end
|
#terminate(wait = false) ⇒ Object
47
48
49
50
51
|
# File 'lib/faktory/processor.rb', line 47
def terminate(wait=false)
@done = true
return if !@thread
@thread.value if wait
end
|
#thread_identity ⇒ Object
164
165
166
|
# File 'lib/faktory/processor.rb', line 164
def thread_identity
@str ||= Thread.current.object_id.to_s(36)
end
|