Class: Pebbles::River::Worker

Inherits:
Object
  • Object
show all
Defined in:
lib/pebbles/river/worker.rb

Overview

Implements a queue worker.

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(handler, options = {}) ⇒ Worker

Initializes worker with a handler. Options:

  • ‘queue`: Same queue options as `River#queue`.

  • ‘on_exception`: If provided, called with `exception` as an argument when a message could not be handled due to an exception. (Connection errors are not reported, however.)

  • ‘logger`: Optional logger. Defaults to stderr. Pass nil to disable.

  • ‘managed_acking`: If true, ack/nack handling is automatic; every message is automatically acked unless the handler returns false or the handler raises an exception, in which case it’s nacked. If false, the handler must do the ack/nacking. Defaults to true.

  • ‘prefetch`: If specified, sets channel’s prefetch count.

The handler must implement ‘call(payload, extra)`, where the payload is the message payload, and the extra argument contains message metadata as a hash. If the handler returns false, it is considered rejected, and will be nacked. Otherwise, the message with be acked.



35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
# File 'lib/pebbles/river/worker.rb', line 35

def initialize(handler, options = {})
  options.assert_valid_keys(
    :queue,
    :logger,
    :on_exception,
    :managed_acking,
    :prefetch)

  unless handler.respond_to?(:call)
    raise ArgumentError.new('Handler must implement #call protocool')
  end

  @queue_options = (options[:queue] || {}).freeze
  if options[:managed_acking] != nil
    @managed_acking = !!options.fetch(:managed_acking, true)
  else
    @managed_acking = true
  end
  @dead_lettered = !!@queue_options[:dead_letter_routing_key]
  @on_exception = options[:on_exception] || ->(*args) { }
  @handler = handler
  @river = River.new(options.slice(:prefetch))
  @next_event_time = Time.now
  @rate_limiter = RateLimiter.new(1.0, 10)
  @logger = options.fetch(:logger, Logger.new($stderr))
end

Instance Attribute Details

#handlerObject (readonly)

Returns the value of attribute handler.



14
15
16
# File 'lib/pebbles/river/worker.rb', line 14

def handler
  @handler
end

#queue_optionsObject (readonly)

Returns the value of attribute queue_options.



13
14
15
# File 'lib/pebbles/river/worker.rb', line 13

def queue_options
  @queue_options
end

#riverObject (readonly)

Returns the value of attribute river.



15
16
17
# File 'lib/pebbles/river/worker.rb', line 15

def river
  @river
end

Class Method Details

.run(handler, options = {}) ⇒ Object



8
9
10
# File 'lib/pebbles/river/worker.rb', line 8

def run(handler, options = {})
  Worker.new(handler, options).run
end

Instance Method Details

#enabled?Boolean

Are we enabled?

Returns:

  • (Boolean)


104
105
106
# File 'lib/pebbles/river/worker.rb', line 104

def enabled?
  @enabled
end

#runObject

Runs the handler. This will process the queue indefinitely.



91
92
93
94
95
96
# File 'lib/pebbles/river/worker.rb', line 91

def run
  @enabled = true
  while enabled? do
    run_once
  end
end

#run_onceObject

Runs the handler once.



63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
# File 'lib/pebbles/river/worker.rb', line 63

def run_once
  with_exceptions do
    now = Time.now

    if @next_event_time > now
      sleep(@next_event_time - now)
      now = Time.now
    end

    if should_run?
      if process_next
        @next_event_time = now
      else
        if @handler.respond_to?(:on_idle)
          with_exceptions do
            @handler.on_idle
          end
        end
        @next_event_time = now + 1
      end
    else
      @next_event_time = now + 5
    end
  end
  nil
end

#stopObject

Stops any concurrent run.



99
100
101
# File 'lib/pebbles/river/worker.rb', line 99

def stop
  @enabled = false
end