Class: Nsque::TestingWorker

Inherits:
Object
  • Object
show all
Defined in:
lib/nsque/testing_worker.rb

Instance Method Summary collapse

Constructor Details

#initialize(options) ⇒ TestingWorker

use this worker in your test to process delayed jobs

Raises:



5
6
7
8
9
10
11
12
# File 'lib/nsque/testing_worker.rb', line 5

def initialize(options)
  raise ChannelRequiredError.new unless options.has_key?(:channel)
  raise ProducerCantBeNilError.new if options[:producer].nil?
  @options = options
  @producer = options[:producer]
  @consumer = Nsqrb::Consumer.new(@options)
  @consumer.connect!
end

Instance Method Details

#clear_allObject



35
36
37
38
39
40
41
42
43
44
45
46
47
# File 'lib/nsque/testing_worker.rb', line 35

def clear_all
  count = 0
  while @producer.messages_count > count
    @consumer.receive
    message = @consumer.messages.pop
    next unless message
    @consumer.confirm(message)
    count += 1
  end

  @producer.reset_counters
  count
end

#process_allObject



14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# File 'lib/nsque/testing_worker.rb', line 14

def process_all
  count = 0
  while @producer.messages_count > count
    @consumer.receive
    message = @consumer.messages.pop
    next unless message
    hash = JSON.parse(message.content)
    begin
      klass = hash['class'].constantize
      klass.new.perform(hash['args'])
    rescue => e
      puts e.inspect
    end
    @consumer.confirm(message)
    count += 1
  end

  @producer.reset_counters
  count
end