Class: Pebbles::River::Worker
- Inherits:
-
Object
- Object
- Pebbles::River::Worker
- Defined in:
- lib/pebbles/river/worker.rb
Overview
Implements a queue worker.
Instance Attribute Summary collapse
-
#handler ⇒ Object
readonly
Returns the value of attribute handler.
-
#queue_options ⇒ Object
readonly
Returns the value of attribute queue_options.
-
#river ⇒ Object
readonly
Returns the value of attribute river.
Class Method Summary collapse
Instance Method Summary collapse
-
#enabled? ⇒ Boolean
Are we enabled?.
-
#initialize(handler, options = {}) ⇒ Worker
constructor
Initializes worker with a handler.
-
#run ⇒ Object
Runs the handler.
-
#run_once ⇒ Object
Runs the handler once.
-
#stop ⇒ Object
Stops any concurrent run.
Constructor Details
#initialize(handler, options = {}) ⇒ Worker
Initializes worker with a handler. Options:
-
‘queue`: Same queue options as `River#queue`.
-
‘on_exception`: If provided, called with `exception` as an argument when a message could not be handled due to an exception. (Connection errors are not reported, however.)
-
‘logger`: Optional logger. Defaults to stderr. Pass nil to disable.
-
‘managed_acking`: If true, ack/nack handling is automatic; every message is automatically acked unless the handler returns false or the handler raises an exception, in which case it’s nacked. If false, the handler must do the ack/nacking. Defaults to true.
-
‘prefetch`: If specified, sets channel’s prefetch count.
The handler must implement ‘call(payload, extra)`, where the payload is the message payload, and the extra argument contains message metadata as a hash. If the handler returns false, it is considered rejected, and will be nacked. Otherwise, the message with be acked.
35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 |
# File 'lib/pebbles/river/worker.rb', line 35 def initialize(handler, = {}) .assert_valid_keys( :queue, :logger, :on_exception, :managed_acking, :prefetch) unless handler.respond_to?(:call) raise ArgumentError.new('Handler must implement #call protocool') end @queue_options = ([:queue] || {}).freeze if [:managed_acking] != nil @managed_acking = !!.fetch(:managed_acking, true) else @managed_acking = true end @dead_lettered = !!@queue_options[:dead_letter_routing_key] @on_exception = [:on_exception] || ->(*args) { } @handler = handler @river = River.new(.slice(:prefetch)) @next_event_time = Time.now @rate_limiter = RateLimiter.new(1.0, 10) @logger = .fetch(:logger, Logger.new($stderr)) end |
Instance Attribute Details
#handler ⇒ Object (readonly)
Returns the value of attribute handler.
14 15 16 |
# File 'lib/pebbles/river/worker.rb', line 14 def handler @handler end |
#queue_options ⇒ Object (readonly)
Returns the value of attribute queue_options.
13 14 15 |
# File 'lib/pebbles/river/worker.rb', line 13 def @queue_options end |
#river ⇒ Object (readonly)
Returns the value of attribute river.
15 16 17 |
# File 'lib/pebbles/river/worker.rb', line 15 def river @river end |
Class Method Details
Instance Method Details
#enabled? ⇒ Boolean
Are we enabled?
104 105 106 |
# File 'lib/pebbles/river/worker.rb', line 104 def enabled? @enabled end |
#run ⇒ Object
Runs the handler. This will process the queue indefinitely.
91 92 93 94 95 96 |
# File 'lib/pebbles/river/worker.rb', line 91 def run @enabled = true while enabled? do run_once end end |
#run_once ⇒ Object
Runs the handler once.
63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 |
# File 'lib/pebbles/river/worker.rb', line 63 def run_once with_exceptions do now = Time.now if @next_event_time > now sleep(@next_event_time - now) now = Time.now end if should_run? if process_next @next_event_time = now else if @handler.respond_to?(:on_idle) with_exceptions do @handler.on_idle end end @next_event_time = now + 1 end else @next_event_time = now + 5 end end nil end |
#stop ⇒ Object
Stops any concurrent run.
99 100 101 |
# File 'lib/pebbles/river/worker.rb', line 99 def stop @enabled = false end |