Class: HonestPubsub::Subscriber
- Inherits:
-
Object
- Object
- HonestPubsub::Subscriber
- Defined in:
- lib/honest_pubsub/subscriber.rb
Instance Attribute Summary collapse
-
#channel ⇒ Object
readonly
Returns the value of attribute channel.
-
#exchange ⇒ Object
readonly
Returns the value of attribute exchange.
-
#listener ⇒ Object
readonly
Returns the value of attribute listener.
Instance Method Summary collapse
-
#initialize(routing_key, durable = true, topic = "honest") ⇒ Subscriber
constructor
A new instance of Subscriber.
-
#start(name, blocking = false) ⇒ Object
name - used to ensure that certain consumers are actually listening to an exchange pass in a lambda for this method to work.
- #teardown ⇒ Object
Constructor Details
#initialize(routing_key, durable = true, topic = "honest") ⇒ Subscriber
Returns a new instance of Subscriber.
7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
# File 'lib/honest_pubsub/subscriber.rb', line 7 def initialize(routing_key, durable = true, topic="honest") @initial_key = routing_key @durable = durable @topic = topic if @initial_key.present? @routing_key = "#{@topic}.#{@initial_key}.#" else @routing_key = "#{@topic}.#" end @logger = ::HonestPubsub::Logger.new self end |
Instance Attribute Details
#channel ⇒ Object (readonly)
Returns the value of attribute channel.
5 6 7 |
# File 'lib/honest_pubsub/subscriber.rb', line 5 def channel @channel end |
#exchange ⇒ Object (readonly)
Returns the value of attribute exchange.
4 5 6 |
# File 'lib/honest_pubsub/subscriber.rb', line 4 def exchange @exchange end |
#listener ⇒ Object (readonly)
Returns the value of attribute listener.
3 4 5 |
# File 'lib/honest_pubsub/subscriber.rb', line 3 def listener @listener end |
Instance Method Details
#start(name, blocking = false) ⇒ Object
name - used to ensure that certain consumers are actually listening to an exchange pass in a lambda for this method to work. We might only want to expose the content instead of all 3 chunks.
25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 |
# File 'lib/honest_pubsub/subscriber.rb', line 25 def start(name, blocking=false) @connection = Bunny.new(Configuration.configuration[:connection]) begin @connection.start rescue => e Airbrake.notify("RabbitMQ unreachable!", params: { message: e.}, environment_name: ENV['RAILS_ENV'] ) raise e end @channel = @connection.create_channel @exchange = @channel.topic(@topic, :durable=>@durable, :auto_delete=>false) # FIX!!! -thl # Need to ensure that the ids for a server will be reproducible in case a server # goes down and has to get restarted. if @initial_key.present? @queue = "#{@initial_key}.#{name}" else @queue = "#{name}" end queue_arguments = {} queue_arguments["x-dead-letter-exchange"] = Configuration.configuration[:dead_letter] if Configuration.configuration[:dead_letter].present? @listener = @channel.queue(@queue, :arguments=>queue_arguments ).bind(@exchange, :routing_key => @routing_key, :exclusive=>false) # Parameters for subscribe that might be useful: # :block=>true - Used for long running consumer applications. (backend servers?) @consumer = @listener.subscribe(:consumer_tag=>name, :block=>blocking) @consumer.on_delivery do |delivery_info, properties, contents| HonestPubsub.logger.debug( "Message delivery with contents: #{contents}") if delivery_info[:redelivered] Airbrake.notify("PubSub Message redelivery", params: {info: delivery_info, props: properties, contents: contents}, environment_name: ENV['RAILS_ENV'] ) end = ::HonestPubsub::Message.new.parse(contents) @logger.log_receive(delivery_info[:routing_key], ) yield delivery_info, properties, true end end |
#teardown ⇒ Object
64 65 66 67 |
# File 'lib/honest_pubsub/subscriber.rb', line 64 def teardown @consumer.cancel if @consumer.present? @connection.close if @connection.present? end |