Class: Disc::Worker

Inherits:
Object
  • Object
show all
Defined in:
lib/disc.rb

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ Worker

Returns a new instance of Worker.



45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
# File 'lib/disc.rb', line 45

def initialize(options = {})
  @disque = options.fetch(:disque, Disc.disque)
  @queues = options.fetch(
    :queues,
    ENV.fetch('QUEUES', 'default')
  ).split(',')
  @count = Integer(
    options.fetch(
      :count,
      ENV.fetch('DISQUE_COUNT', '1')
    )
  )
  @timeout = Integer(
    options.fetch(
      :timeout,
      ENV.fetch('DISQUE_TIMEOUT', '2000')
    )
  )

  self.run if options[:run]
  self
end

Instance Attribute Details

#countObject (readonly)

Returns the value of attribute count.



36
37
38
# File 'lib/disc.rb', line 36

def count
  @count
end

#disqueObject (readonly)

Returns the value of attribute disque.



36
37
38
# File 'lib/disc.rb', line 36

def disque
  @disque
end

#queuesObject (readonly)

Returns the value of attribute queues.



36
37
38
# File 'lib/disc.rb', line 36

def queues
  @queues
end

#timeoutObject (readonly)

Returns the value of attribute timeout.



36
37
38
# File 'lib/disc.rb', line 36

def timeout
  @timeout
end

Class Method Details

.runObject



41
42
43
# File 'lib/disc.rb', line 41

def self.run
  new.run
end

Instance Method Details

#runObject



68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
# File 'lib/disc.rb', line 68

def run
  $stdout.puts("Disc::Worker listening in #{queues}")
  loop do
    jobs = disque.fetch(from: queues, timeout: timeout, count: count)
    Array(jobs).each do |queue, msgid, serialized_job|
      begin
        job = MessagePack.unpack(serialized_job)
        instance = Object.const_get(job['class']).new
        instance.perform(*job['arguments'])
        disque.call('ACKJOB', msgid)
      rescue => err
        Disc.on_error(err, job.update('id' => msgid, 'queue' => queue))
      end
    end
  end
end