Class: QueueingRabbit::Worker

Inherits:
Object
  • Object
show all
Includes:
Logging
Defined in:
lib/queueing_rabbit/worker.rb

Defined Under Namespace

Classes: WorkerError

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(jobs, concurrency = nil) ⇒ Worker

Returns a new instance of Worker.



12
13
14
15
16
17
18
19
20
# File 'lib/queueing_rabbit/worker.rb', line 12

def initialize(jobs, concurrency = nil)
  @jobs = jobs.map { |job| job.to_s.strip }.reject { |job| job.empty? }
  @concurrency = concurrency || @jobs.count
  @mutex_pool = ::MutexPool.new(@concurrency)

  sync_stdio
  validate_jobs
  constantize_jobs
end

Instance Attribute Details

#concurrencyObject (readonly)

Returns the value of attribute concurrency.



10
11
12
# File 'lib/queueing_rabbit/worker.rb', line 10

def concurrency
  @concurrency
end

#jobsObject (readonly)

Returns the value of attribute jobs.



10
11
12
# File 'lib/queueing_rabbit/worker.rb', line 10

def jobs
  @jobs
end

#mutex_poolObject (readonly)

Returns the value of attribute mutex_pool.



10
11
12
# File 'lib/queueing_rabbit/worker.rb', line 10

def mutex_pool
  @mutex_pool
end

Instance Method Details

#invoke_job(job, payload, metadata) ⇒ Object



92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
# File 'lib/queueing_rabbit/worker.rb', line 92

def invoke_job(job, payload, )
  info "performing job #{job}"
  
  if job.respond_to?(:perform)
    job.perform(payload, )
  elsif job <= QueueingRabbit::AbstractJob
    job.new(payload, ).perform
  else
    error "don't know how to perform job #{job}"
  end
rescue => e
  QueueingRabbit.trigger_event(:consumer_error, e)
  error "unexpected error #{e.class} occured: #{e.message}"
  debug e
end

#pidObject



61
62
63
# File 'lib/queueing_rabbit/worker.rb', line 61

def pid
  Process.pid
end

#pidfile_exists?Boolean

Returns:

  • (Boolean)


57
58
59
# File 'lib/queueing_rabbit/worker.rb', line 57

def pidfile_exists?
  @pidfile && File.exists?(@pidfile)
end

#read_pidfileObject



53
54
55
# File 'lib/queueing_rabbit/worker.rb', line 53

def read_pidfile
  File.read(@pidfile).to_i if pidfile_exists?
end

#remove_pidfileObject



49
50
51
# File 'lib/queueing_rabbit/worker.rb', line 49

def remove_pidfile
  File.delete(@pidfile) if pidfile_exists?
end

#stop(connection = QueueingRabbit.connection, graceful = false) ⇒ Object



69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
# File 'lib/queueing_rabbit/worker.rb', line 69

def stop(connection = QueueingRabbit.connection, graceful = false)
  connection.next_tick do
    begin
      @working = false
      if graceful
        Timeout.timeout(QueueingRabbit.jobs_wait_timeout) { @mutex_pool.lock }
        QueueingRabbit.trigger_event(:consuming_done)
        info "gracefully shutting down the worker #{self}"
      end
    rescue Timeout::Error
      error "a timeout (> #{QueueingRabbit.jobs_wait_timeout}s) when trying to gracefully shut down the worker " \
            "#{self}"
    rescue => e
      error "a #{e.class} error occurred when trying to shut down the worker #{self}"
      debug e
    ensure
      connection.close do
        remove_pidfile
      end
    end
  end
end

#to_sObject



65
66
67
# File 'lib/queueing_rabbit/worker.rb', line 65

def to_s
  "PID=#{pid}, JOBS=#{jobs.join(',')} CONCURRENCY=#{@concurrency}"
end

#use_pidfile(filename) ⇒ Object



43
44
45
46
47
# File 'lib/queueing_rabbit/worker.rb', line 43

def use_pidfile(filename)
  @pidfile = filename
  cleanup_pidfile
  File.open(@pidfile, 'w') { |f| f << pid }
end

#workObject



26
27
28
29
30
31
32
33
# File 'lib/queueing_rabbit/worker.rb', line 26

def work
  return if working?
  @working = true

  QueueingRabbit.trigger_event(:worker_ready)
  jobs.each { |job| run_job(QueueingRabbit.connection, job) }
  QueueingRabbit.trigger_event(:consuming_started)
end

#work!Object



35
36
37
38
39
40
41
# File 'lib/queueing_rabbit/worker.rb', line 35

def work!
  return if working?

  trap_signals
  info "starting a new queueing_rabbit worker #{self}"
  QueueingRabbit.begin_worker_loop { work }
end

#working?Boolean

Returns:

  • (Boolean)


22
23
24
# File 'lib/queueing_rabbit/worker.rb', line 22

def working?
  @working
end