Class: Faktory::Processor

Inherits:
Object
  • Object
show all
Includes:
Util
Defined in:
lib/faktory/processor.rb

Overview

The Processor is a standalone thread which:

  1. fetches a job

  2. executes the job

a. instantiate the Worker
b. run the middleware chain
c. call #perform

A Processor can exit due to shutdown (processor_stopped) or due to an error during job execution (processor_died)

If an error occurs in the job execution, the Processor calls the Manager to create a new one to replace itself and exits.

Constant Summary collapse

@@busy_lock =
Mutex.new
@@busy_count =
0

Constants included from Util

Util::EXPIRY

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Util

#fire_event, #hostname, #identity, #logger, #process_nonce, #safe_thread, #server, #watchdog

Methods included from ExceptionHandler

#handle_exception

Constructor Details

#initialize(mgr) ⇒ Processor

Returns a new instance of Processor.



37
38
39
40
41
42
43
44
45
# File 'lib/faktory/processor.rb', line 37

def initialize(mgr)
  @mgr = mgr
  @down = false
  @done = false
  @thread = nil
  @reloader = mgr.options[:reloader]
  @logging = (mgr.options[:job_logger] || Faktory::JobLogger).new
  @fetcher = Faktory::Fetcher.new(mgr.options)
end

Instance Attribute Details

#jobObject (readonly)

Returns the value of attribute job.



29
30
31
# File 'lib/faktory/processor.rb', line 29

def job
  @job
end

#threadObject (readonly)

Returns the value of attribute thread.



28
29
30
# File 'lib/faktory/processor.rb', line 28

def thread
  @thread
end

Class Method Details

.busy_countObject



33
34
35
# File 'lib/faktory/processor.rb', line 33

def self.busy_count
  @@busy_count
end

Instance Method Details

#constantize(str) ⇒ Object



168
169
170
171
172
173
174
175
# File 'lib/faktory/processor.rb', line 168

def constantize(str)
  names = str.split('::')
  names.shift if names.empty? || names.first.empty?

  names.inject(Object) do |constant, name|
    constant.const_defined?(name) ? constant.const_get(name) : constant.const_missing(name)
  end
end

#dispatch(payload) ⇒ Object



126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
# File 'lib/faktory/processor.rb', line 126

def dispatch(payload)
  Faktory::Logging.with_job_hash_context(payload) do
    @logging.call(payload) do
      # Rails 5 requires a Reloader to wrap code execution.  In order to
      # constantize the worker and instantiate an instance, we have to call
      # the Reloader.  It handles code loading, db connection management, etc.
      # Effectively this block denotes a "unit of work" to Rails.
      @reloader.call do
        klass  = constantize(payload['jobtype'.freeze])
        jobinst = klass.new
        jobinst.jid = payload['jid'.freeze]
        yield jobinst
      end
    end
  end
end

#fetchObject



103
104
105
106
107
108
109
110
111
112
# File 'lib/faktory/processor.rb', line 103

def fetch
  begin
    work = @fetcher.retrieve_work
    (logger.info { "Faktory is online, #{Time.now - @down} sec downtime" }; @down = nil) if @down
    work
  rescue Faktory::Shutdown
  rescue => ex
    handle_fetch_exception(ex)
  end
end

#handle_fetch_exception(ex) ⇒ Object



114
115
116
117
118
119
120
121
122
123
124
# File 'lib/faktory/processor.rb', line 114

def handle_fetch_exception(ex)
  if !@down
    @down = Time.now
    logger.error("Error fetching job: #{ex}")
    ex.backtrace.each do |bt|
      logger.error(bt)
    end
  end
  sleep(1)
  nil
end

#kill(wait = false) ⇒ Object



53
54
55
56
57
58
59
60
61
62
63
# File 'lib/faktory/processor.rb', line 53

def kill(wait=false)
  @done = true
  return if !@thread
  # unlike the other actors, terminate does not wait
  # for the thread to finish because we don't know how
  # long the job will take to finish.  Instead we
  # provide a `kill` method to call after the shutdown
  # timeout passes.
  @thread.raise ::Faktory::Shutdown
  @thread.value if wait
end

#process(work) ⇒ Object



143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
# File 'lib/faktory/processor.rb', line 143

def process(work)
  payload = work.job
  begin
    dispatch(payload) do |jobinst|
      Faktory.worker_middleware.invoke(jobinst, payload) do
        jobinst.perform(*payload['args'.freeze])
      end
    end
    work.acknowledge
  rescue Faktory::Shutdown => shut
    # Had to force kill this job because it didn't finish within
    # the timeout.  Fail it so we can release any locks server-side
    # and immediately restart it.
    work.fail(shut)
  rescue Exception => ex
    handle_exception(ex, { :context => "Job raised exception", :job => work.job })
    work.fail(ex)
    raise ex
  end
end

#process_oneObject



84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
# File 'lib/faktory/processor.rb', line 84

def process_one
  work = fetch
  if work
    @@busy_lock.synchronize do
      @@busy_count = @@busy_count + 1
    end
    begin
      @job = work.job
      process(work)
    ensure
      @@busy_lock.synchronize do
        @@busy_count = @@busy_count - 1
      end
    end
  else
    sleep 1
  end
end

#runObject



71
72
73
74
75
76
77
78
79
80
81
82
# File 'lib/faktory/processor.rb', line 71

def run
  begin
    while !@done
      process_one
    end
    @mgr.processor_stopped(self)
  rescue Faktory::Shutdown
    @mgr.processor_stopped(self)
  rescue Exception => ex
    @mgr.processor_died(self, ex)
  end
end

#startObject



65
66
67
# File 'lib/faktory/processor.rb', line 65

def start
  @thread ||= safe_thread("processor", &method(:run))
end

#terminate(wait = false) ⇒ Object



47
48
49
50
51
# File 'lib/faktory/processor.rb', line 47

def terminate(wait=false)
  @done = true
  return if !@thread
  @thread.value if wait
end

#thread_identityObject



164
165
166
# File 'lib/faktory/processor.rb', line 164

def thread_identity
  @str ||= Thread.current.object_id.to_s(36)
end