Class: Disc::Worker
- Inherits:
-
Object
- Object
- Disc::Worker
- Defined in:
- lib/disc.rb
Instance Attribute Summary collapse
-
#count ⇒ Object
readonly
Returns the value of attribute count.
-
#disque ⇒ Object
readonly
Returns the value of attribute disque.
-
#queues ⇒ Object
readonly
Returns the value of attribute queues.
-
#timeout ⇒ Object
readonly
Returns the value of attribute timeout.
Class Method Summary collapse
Instance Method Summary collapse
-
#initialize(options = {}) ⇒ Worker
constructor
A new instance of Worker.
- #run ⇒ Object
Constructor Details
#initialize(options = {}) ⇒ Worker
Returns a new instance of Worker.
45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 |
# File 'lib/disc.rb', line 45 def initialize( = {}) @disque = .fetch(:disque, Disc.disque) @queues = .fetch( :queues, ENV.fetch('QUEUES', 'default') ).split(',') @count = Integer( .fetch( :count, ENV.fetch('DISQUE_COUNT', '1') ) ) @timeout = Integer( .fetch( :timeout, ENV.fetch('DISQUE_TIMEOUT', '2000') ) ) self.run if [:run] self end |
Instance Attribute Details
#count ⇒ Object (readonly)
Returns the value of attribute count.
36 37 38 |
# File 'lib/disc.rb', line 36 def count @count end |
#disque ⇒ Object (readonly)
Returns the value of attribute disque.
36 37 38 |
# File 'lib/disc.rb', line 36 def disque @disque end |
#queues ⇒ Object (readonly)
Returns the value of attribute queues.
36 37 38 |
# File 'lib/disc.rb', line 36 def queues @queues end |
#timeout ⇒ Object (readonly)
Returns the value of attribute timeout.
36 37 38 |
# File 'lib/disc.rb', line 36 def timeout @timeout end |
Class Method Details
.run ⇒ Object
41 42 43 |
# File 'lib/disc.rb', line 41 def self.run new.run end |
Instance Method Details
#run ⇒ Object
68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 |
# File 'lib/disc.rb', line 68 def run $stdout.puts("Disc::Worker listening in #{queues}") loop do jobs = disque.fetch(from: queues, timeout: timeout, count: count) Array(jobs).each do |queue, msgid, serialized_job| begin job = MessagePack.unpack(serialized_job) instance = Object.const_get(job['class']).new instance.perform(*job['arguments']) disque.call('ACKJOB', msgid) rescue => err Disc.on_error(err, job.update('id' => msgid, 'queue' => queue)) end end end end |