Class: Disc::Worker

Inherits:
Object
  • Object
show all
Defined in:
lib/disc/worker.rb

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ Worker

Returns a new instance of Worker.



22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# File 'lib/disc/worker.rb', line 22

def initialize(options = {})
  @disque = options.fetch(:disque, Disc.disque)
  @queues = options.fetch(
    :queues,
    ENV.fetch('QUEUES', Disc.default_queue)
  ).split(',')
  @count = Integer(
    options.fetch(
      :count,
      ENV.fetch('DISQUE_COUNT', '1')
    )
  )
  @timeout = Integer(
    options.fetch(
      :timeout,
      ENV.fetch('DISQUE_TIMEOUT', '2000')
    )
  )

  self.run if options[:run]
  self
end

Instance Attribute Details

#countObject (readonly)

Returns the value of attribute count.



5
6
7
# File 'lib/disc/worker.rb', line 5

def count
  @count
end

#disqueObject (readonly)

Returns the value of attribute disque.



5
6
7
# File 'lib/disc/worker.rb', line 5

def disque
  @disque
end

#queuesObject (readonly)

Returns the value of attribute queues.



5
6
7
# File 'lib/disc/worker.rb', line 5

def queues
  @queues
end

#timeoutObject (readonly)

Returns the value of attribute timeout.



5
6
7
# File 'lib/disc/worker.rb', line 5

def timeout
  @timeout
end

Class Method Details

.currentObject



10
11
12
# File 'lib/disc/worker.rb', line 10

def self.current
  @current ||= new
end

.runObject



14
15
16
# File 'lib/disc/worker.rb', line 14

def self.run
  current.run
end

.stopObject



18
19
20
# File 'lib/disc/worker.rb', line 18

def self.stop
  current.stop
end

Instance Method Details

#runObject



49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
# File 'lib/disc/worker.rb', line 49

def run
  $stdout.puts("Disc::Worker listening in #{queues}")
  loop do
    jobs = disque.fetch(from: queues, timeout: timeout, count: count)
    Array(jobs).each do |queue, msgid, serialized_job|
      begin
        job = Disc.deserialize(serialized_job)
        instance = Object.const_get(job['class']).new
        instance.perform(*job['arguments'])
        disque.call('ACKJOB', msgid)
        $stdout.puts("Completed #{job['class']} id #{msgid}")
      rescue => err
        Disc.on_error(err, job.update('id' => msgid, 'queue' => queue))
      end
    end

    break if @stop
  end
ensure
  disque.quit
end

#stopObject



45
46
47
# File 'lib/disc/worker.rb', line 45

def stop
  @stop = true
end