Class: Pwwka::ChannelConnector
- Inherits:
-
Object
- Object
- Pwwka::ChannelConnector
- Extended by:
- Logging
- Includes:
- Logging
- Defined in:
- lib/pwwka/channel_connector.rb
Constant Summary
Constants included from Logging
Instance Attribute Summary collapse
-
#channel ⇒ Object
readonly
Returns the value of attribute channel.
-
#configuration ⇒ Object
readonly
Returns the value of attribute configuration.
-
#connection ⇒ Object
readonly
Returns the value of attribute connection.
Instance Method Summary collapse
- #connection_close ⇒ Object
- #delayed_exchange ⇒ Object
- #delayed_queue ⇒ Object (also: #create_delayed_queue)
-
#initialize(prefetch: nil, connection_name: nil) ⇒ ChannelConnector
constructor
The channel_connector starts the connection to the message_bus so it should only be instantiated by a method that has a strategy for closing the connection.
- #raise_if_delayed_not_allowed ⇒ Object
- #topic_exchange ⇒ Object
Methods included from Logging
Constructor Details
#initialize(prefetch: nil, connection_name: nil) ⇒ ChannelConnector
The channel_connector starts the connection to the message_bus so it should only be instantiated by a method that has a strategy for closing the connection
13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 |
# File 'lib/pwwka/channel_connector.rb', line 13 def initialize(prefetch: nil, connection_name: nil) @configuration = Pwwka.configuration = {automatically_recover: false}.merge(configuration.) = {client_properties: {connection_name: connection_name}}.merge() if connection_name begin @connection = Bunny.new(configuration.rabbit_mq_host, ) @connection.start rescue => e logf "ERROR Connecting to RabbitMQ: #{e}", at: :error @connection.close if @connection raise e end begin @channel = @connection.create_channel @channel.on_error do |ch, method| logf "ERROR On RabbitMQ channel: #{method.inspect}" end rescue => e logf "ERROR Opening RabbitMQ channel: #{e}", at: :error @connection.close if @connection raise e end if prefetch @channel.prefetch(prefetch.to_i) end end |
Instance Attribute Details
#channel ⇒ Object (readonly)
Returns the value of attribute channel.
8 9 10 |
# File 'lib/pwwka/channel_connector.rb', line 8 def channel @channel end |
#configuration ⇒ Object (readonly)
Returns the value of attribute configuration.
7 8 9 |
# File 'lib/pwwka/channel_connector.rb', line 7 def configuration @configuration end |
#connection ⇒ Object (readonly)
Returns the value of attribute connection.
6 7 8 |
# File 'lib/pwwka/channel_connector.rb', line 6 def connection @connection end |
Instance Method Details
#connection_close ⇒ Object
84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 |
# File 'lib/pwwka/channel_connector.rb', line 84 def connection_close begin channel.close rescue => e logf "ERROR Closing RabbitMQ channel: #{e}", at: :error raise e end begin connection.close rescue => e logf "ERROR Closing connection to RabbitMQ: #{e}", at: :error raise e end end |
#delayed_exchange ⇒ Object
48 49 50 51 |
# File 'lib/pwwka/channel_connector.rb', line 48 def delayed_exchange raise_if_delayed_not_allowed @delayed_exchange ||= channel.fanout(configuration.delayed_exchange_name, durable: true) end |
#delayed_queue ⇒ Object Also known as: create_delayed_queue
53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 |
# File 'lib/pwwka/channel_connector.rb', line 53 def delayed_queue # This works by hacking the dead letter exchange concept with a timeout. # We set up a delayed exchange that has a delayed queue. This queue, configured below, # sets its dead letter exchange to be the main exchange (topic_exchange above). # # This means that when a message send to the delayed queue is either nack'ed with no retry OR # its TTL expires, it will be sent to the configured dead letter exchange, which is the main topic_exchange. # # Since nothing is actually consuming messages on the delayed queue, the only way messages can be removed and # sent back to the main exchange is if their TTL expires. As you can see in Pwwka::Transmitter#send_delayed_message! # we set an expiration on the message and send it to the delayed exchange. This means that the delay time is the TTL, # so the messages sits in the delayed queue until its TTL/delay expires, and then it's sent onto the # main exchange for everyone to consume. Thus creating a delay. raise_if_delayed_not_allowed @delayed_queue ||= begin queue = channel.queue("pwwka_delayed_#{Pwwka.environment}", durable: true, arguments: { 'x-dead-letter-exchange' => configuration.topic_exchange_name, }) queue.bind(delayed_exchange) queue end end |
#raise_if_delayed_not_allowed ⇒ Object
78 79 80 81 82 |
# File 'lib/pwwka/channel_connector.rb', line 78 def raise_if_delayed_not_allowed unless configuration.allow_delayed? raise ConfigurationError, "Delayed messages are not allowed. Update your configuration to allow them." end end |
#topic_exchange ⇒ Object
44 45 46 |
# File 'lib/pwwka/channel_connector.rb', line 44 def topic_exchange @topic_exchange ||= channel.topic(configuration.topic_exchange_name, durable: true) end |