Class: RailsPipeline::IronmqPullingSubscriber
- Inherits:
-
Object
- Object
- RailsPipeline::IronmqPullingSubscriber
- Includes:
- Subscriber
- Defined in:
- lib/rails-pipeline/ironmq_pulling_subscriber.rb
Constant Summary
Constants included from Subscriber
Subscriber::Error, Subscriber::NoApiKeyError, Subscriber::WrongApiKeyError
Instance Attribute Summary collapse
-
#queue_name ⇒ Object
readonly
Returns the value of attribute queue_name.
Instance Method Summary collapse
- #activate_subscription ⇒ Object
- #active_subscription? ⇒ Boolean
- #deactivate_subscription ⇒ Object
-
#initialize(queue_name) ⇒ IronmqPullingSubscriber
constructor
A new instance of IronmqPullingSubscriber.
- #process_envelope(envelope, message, block) ⇒ Object
- #process_message(message, halt_on_error, block) ⇒ Object
-
#pull_message(wait_time) {|queue.get(:wait => wait_time)| ... } ⇒ Object
the wait time on this may need to be changed haven’t seen rate limit info on these calls but didnt look all that hard either.
-
#start_subscription(params = {wait_time: 2, halt_on_error: true}, &block) ⇒ Object
Valid Parameters at this time are wait_time - An integer indicating how long in seconds we should long poll on empty queues halt_on_error - A boolean indicating if we should stop our queue subscription if an error occurs.
Methods included from Subscriber
included, register, registered_handlers, target_class, target_handler
Constructor Details
#initialize(queue_name) ⇒ IronmqPullingSubscriber
Returns a new instance of IronmqPullingSubscriber.
10 11 12 13 |
# File 'lib/rails-pipeline/ironmq_pulling_subscriber.rb', line 10 def initialize(queue_name) @queue_name = queue_name @subscription_status = false end |
Instance Attribute Details
#queue_name ⇒ Object (readonly)
Returns the value of attribute queue_name.
8 9 10 |
# File 'lib/rails-pipeline/ironmq_pulling_subscriber.rb', line 8 def queue_name @queue_name end |
Instance Method Details
#activate_subscription ⇒ Object
54 55 56 |
# File 'lib/rails-pipeline/ironmq_pulling_subscriber.rb', line 54 def activate_subscription @subscription_status = true end |
#active_subscription? ⇒ Boolean
50 51 52 |
# File 'lib/rails-pipeline/ironmq_pulling_subscriber.rb', line 50 def active_subscription? @subscription_status end |
#deactivate_subscription ⇒ Object
58 59 60 |
# File 'lib/rails-pipeline/ironmq_pulling_subscriber.rb', line 58 def deactivate_subscription @subscription_status = false end |
#process_envelope(envelope, message, block) ⇒ Object
62 63 64 65 66 67 68 |
# File 'lib/rails-pipeline/ironmq_pulling_subscriber.rb', line 62 def process_envelope(envelope, , block) callback_status = block.call(envelope) if callback_status .delete end end |
#process_message(message, halt_on_error, block) ⇒ Object
29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 |
# File 'lib/rails-pipeline/ironmq_pulling_subscriber.rb', line 29 def (, halt_on_error, block) begin if .nil? || JSON.parse(.body).empty? deactivate_subscription else payload = parse_ironmq_payload(.body) envelope = generate_envelope(payload) process_envelope(envelope, , block) end rescue Exception => e if halt_on_error deactivate_subscription end RailsPipeline.logger.error "A message was unable to be processed as was not removed from the queue." RailsPipeline.logger.error "The message: #{.inspect}" raise e end end |
#pull_message(wait_time) {|queue.get(:wait => wait_time)| ... } ⇒ Object
the wait time on this may need to be changed haven’t seen rate limit info on these calls but didnt look all that hard either.
74 75 76 77 |
# File 'lib/rails-pipeline/ironmq_pulling_subscriber.rb', line 74 def (wait_time) queue = _iron.queue(queue_name) yield queue.get(:wait => wait_time) end |
#start_subscription(params = {wait_time: 2, halt_on_error: true}, &block) ⇒ Object
Valid Parameters at this time are wait_time - An integer indicating how long in seconds we should long poll on empty queues halt_on_error - A boolean indicating if we should stop our queue subscription if an error occurs
18 19 20 21 22 23 24 25 26 |
# File 'lib/rails-pipeline/ironmq_pulling_subscriber.rb', line 18 def start_subscription(params={wait_time: 2, halt_on_error: true}, &block) activate_subscription while active_subscription? (params[:wait_time]) do || (, params[:halt_on_error], block) end end end |