Class: Watership::Consumer
- Inherits:
-
Object
- Object
- Watership::Consumer
- Defined in:
- lib/watership/consumer.rb
Class Method Summary collapse
Instance Method Summary collapse
- #ack_message(tag) ⇒ Object
- #bind(name, opts = {}) ⇒ Object
- #channel ⇒ Object
- #clear_active_record_connections ⇒ Object
- #connection ⇒ Object
- #consume ⇒ Object
- #create_queue ⇒ Object
- #enqueue(message) ⇒ Object
-
#initialize(consumer, url, channel_options = {}, queue_options = {}) ⇒ Consumer
constructor
A new instance of Consumer.
- #logger ⇒ Object
- #notify(exception, data) ⇒ Object
- #reject_message(tag, requeue = true) ⇒ Object
Constructor Details
#initialize(consumer, url, channel_options = {}, queue_options = {}) ⇒ Consumer
Returns a new instance of Consumer.
7 8 9 10 11 12 13 14 15 |
# File 'lib/watership/consumer.rb', line 7 def initialize(consumer, url, = {}, = {}) @consumer = consumer @url = url @prefetch = .delete(:prefetch) || Integer(ENV.fetch("RABBIT_CONSUMER_PREFETCH", 200)) @concurrency = .delete(:concurrency) || 1 @forever = .delete(:forever) || true @channel_opts = { durable: true }.merge() @queue_opts = { block: false, manual_ack: true }.merge() end |
Class Method Details
.sleep_forever ⇒ Object
102 103 104 105 106 107 108 109 |
# File 'lib/watership/consumer.rb', line 102 def self.sleep_forever sleepy_thread = Thread.new { sleep } Signal.trap("TERM") do sleepy_thread.terminate end sleepy_thread.join rescue Interrupt end |
Instance Method Details
#ack_message(tag) ⇒ Object
57 58 59 60 |
# File 'lib/watership/consumer.rb', line 57 def (tag) logger.info "Acking message" channel.acknowledge(tag, false) end |
#bind(name, opts = {}) ⇒ Object
53 54 55 |
# File 'lib/watership/consumer.rb', line 53 def bind(name, opts = {}) create_queue.bind(name, opts) end |
#channel ⇒ Object
75 76 77 78 79 80 81 |
# File 'lib/watership/consumer.rb', line 75 def channel @channel ||= begin created_channel = connection.create_channel(nil, @concurrency) created_channel.prefetch(@prefetch) created_channel end end |
#clear_active_record_connections ⇒ Object
96 97 98 99 100 |
# File 'lib/watership/consumer.rb', line 96 def clear_active_record_connections if defined?(::ActiveRecord::Base) && ::ActiveRecord::Base.respond_to?(:clear_active_connections!) ::ActiveRecord::Base.clear_active_connections! end end |
#connection ⇒ Object
71 72 73 |
# File 'lib/watership/consumer.rb', line 71 def connection @connection ||= Bunny.new(@url).tap { |bunny| bunny.start } end |
#consume ⇒ Object
17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 |
# File 'lib/watership/consumer.rb', line 17 def consume queue = create_queue queue.subscribe(@queue_opts.dup) do |delivery_info, properties, payload| begin data = JSON.parse(payload) @consumer.new.call(data) success = true rescue StandardError => exception logger.error "Error thrown in subscribe block" logger.error exception. logger.error exception.backtrace.join("\n") retries = data["retries"] || 0 notify(exception, { payload: data, retries: retries }) enqueue(data.merge(retries: (retries + 1))) success = true rescue Interrupt => exception logger.error "Interrupt in subscribe block" logger.warn "Stopped gracefully." ensure if success (delivery_info.delivery_tag) else (delivery_info.delivery_tag) end clear_active_record_connections end end self.class.sleep_forever if @forever ensure logger.info "Closing Channel" channel.close end |
#create_queue ⇒ Object
67 68 69 |
# File 'lib/watership/consumer.rb', line 67 def create_queue channel.queue(@consumer::QUEUE, @channel_opts) end |
#enqueue(message) ⇒ Object
83 84 85 |
# File 'lib/watership/consumer.rb', line 83 def enqueue() create_queue.publish(JSON.generate()) end |
#logger ⇒ Object
92 93 94 |
# File 'lib/watership/consumer.rb', line 92 def logger @logger ||= defined?(Rails) ? Rails.logger : Logger.new(STDOUT) end |
#notify(exception, data) ⇒ Object
87 88 89 90 |
# File 'lib/watership/consumer.rb', line 87 def notify(exception, data) Airbrake.notify(exception) if defined?(Airbrake) Bugsnag.notify(exception, data: data) if defined?(Bugsnag) end |
#reject_message(tag, requeue = true) ⇒ Object
62 63 64 65 |
# File 'lib/watership/consumer.rb', line 62 def (tag, requeue = true) logger.info "Rejecting message" channel.reject(tag, requeue) end |