Class: Philotic::Connection

Inherits:
Object
  • Object
show all
Extended by:
Forwardable
Defined in:
lib/philotic/connection.rb

Defined Under Namespace

Classes: TCPConnectionFailed

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeConnection

Returns a new instance of Connection.



29
30
31
# File 'lib/philotic/connection.rb', line 29

def initialize
  @connection_attempts = 0
end

Instance Attribute Details

#connectionObject (readonly)

Returns the value of attribute connection.



24
25
26
# File 'lib/philotic/connection.rb', line 24

def connection
  @connection
end

#connection_attemptsObject (readonly)

Returns the value of attribute connection_attempts.



24
25
26
# File 'lib/philotic/connection.rb', line 24

def connection_attempts
  @connection_attempts
end

#loggerObject

Returns the value of attribute logger.



25
26
27
# File 'lib/philotic/connection.rb', line 25

def logger
  @logger
end

#publisherObject



33
34
35
# File 'lib/philotic/connection.rb', line 33

def publisher
  @publisher ||= Philotic::Publisher.new self
end

#subscriberObject



37
38
39
# File 'lib/philotic/connection.rb', line 37

def subscriber
  @subscriber ||= Philotic::Subscriber.new self
end

Instance Method Details

#attempt_connectionObject



74
75
76
77
78
79
80
81
# File 'lib/philotic/connection.rb', line 74

def attempt_connection
  @connection_attempts += 1
  logger.warn { "Connecting to RabbitMQ: #{config.sanitized_rabbit_url}. Attempt #{connection_attempts} of #{config.connection_attempts}" } if connection_attempts > 1

  @connection = Bunny.new(config.rabbit_url, connection_settings)
  @connection.start
  @connection_attempts = 0
end

#bind_queue(queue, config) ⇒ Object



139
140
141
142
143
144
145
146
147
148
# File 'lib/philotic/connection.rb', line 139

def bind_queue(queue, config)
  queue_exchange = exchange_from_config(config)
  bindings       = config[:bindings]
  bindings.each do |arguments|
    queue.bind(queue_exchange, {arguments: arguments})
    logger.info { "Added binding to queue. queue: #{queue.name} binding: #{arguments}" }
  end

  logger.info { "Finished adding bindings to queue. queue: #{queue.name}" }
end

#channelObject



102
103
104
# File 'lib/philotic/connection.rb', line 102

def channel
  @channel ||= connection.create_channel
end

#closeObject



91
92
93
94
95
96
# File 'lib/philotic/connection.rb', line 91

def close
  logger.warn { "closing connection to RabbitMQ: #{config.sanitized_rabbit_url}" }
  connection.close if connected?
  @channel  = nil
  @exchange = nil
end

#configObject



41
42
43
# File 'lib/philotic/connection.rb', line 41

def config
  @config ||= Philotic::Config.new self
end

#connect!Object



45
46
47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/philotic/connection.rb', line 45

def connect!
  return if connected?

  start_connection!

  if connected?
    logger.info { "Connected to RabbitMQ: #{config.sanitized_rabbit_url}" }
    set_exchange_return_handler!
    true
  else
    logger.error { "Failed to connect to RabbitMQ: #{config.sanitized_rabbit_url}" }
    false
  end
end

#connected?Boolean

Returns:

  • (Boolean)


98
99
100
# File 'lib/philotic/connection.rb', line 98

def connected?
  connection && connection.connected?
end

#connection_settingsObject



83
84
85
86
87
88
89
# File 'lib/philotic/connection.rb', line 83

def connection_settings
  {
    automatically_recover: config.automatically_recover,
    network_recovery_interval: config.network_recovery_interval,
    continuation_timeout: config.continuation_timeout,
  }
end

#exchangeObject



106
107
108
# File 'lib/philotic/connection.rb', line 106

def exchange
  @exchange ||= channel.send(config.exchange_type, config.exchange_name, durable: true)
end

#exchange_from_config(config) ⇒ Object



150
151
152
# File 'lib/philotic/connection.rb', line 150

def exchange_from_config(config)
  config[:exchange] ? channel.send(self.config.exchange_type, config[:exchange], durable: true) : exchange
end

#initialize_named_queue!(queue_name, config) ⇒ Object



116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
# File 'lib/philotic/connection.rb', line 116

def initialize_named_queue!(queue_name, config)
  raise RuntimeError.new 'Philotic.config.initialize_named_queues must be true to run Philotic.initialize_named_queue!' unless self.config.initialize_named_queues

  connect!
  queue_exists = connection.queue_exists? queue_name

  should_delete_queue = queue_exists && self.config.delete_existing_queues
  should_create_queue = !queue_exists || self.config.delete_existing_queues

  if should_delete_queue
    channel.queue(queue_name, passive: true).delete
    logger.info { "deleted old queue. queue: #{queue_name}" }
  end

  if should_create_queue
    config = config.deep_symbolize_keys
    queue  = queue_from_config(queue_name, config)
    bind_queue(queue, config)
  else
    logger.warn { "Queue #{queue_name} not created; it already exists. self.config.delete_existing_queues must be true to override." }
  end
end

#queue_from_config(queue_name, config) ⇒ Object



154
155
156
157
158
159
160
161
# File 'lib/philotic/connection.rb', line 154

def queue_from_config(queue_name, config)
  queue_options = Philotic::DEFAULT_NAMED_QUEUE_OPTIONS.dup
  queue_options.merge!(config[:options] || {})

  channel.queue(queue_name, queue_options).tap do
    logger.info { "Created queue. queue:#{queue_name}" }
  end
end

#set_exchange_return_handler!Object



110
111
112
113
114
# File 'lib/philotic/connection.rb', line 110

def set_exchange_return_handler!
  exchange.on_return do |basic_return, , payload|
    config.message_return_handler.call(basic_return, , payload)
  end
end

#start_connection!Object



60
61
62
63
64
65
66
67
68
69
70
71
72
# File 'lib/philotic/connection.rb', line 60

def start_connection!
  begin
    attempt_connection
  rescue ::Bunny::TCPConnectionFailed => e
    if connection_attempts < config.connection_attempts
      retry
    else
      attempts             = connection_attempts
      @connection_attempts = 0
      raise TCPConnectionFailed.new "Failed to connect to RabbitMQ server after #{attempts} attempts", config.sanitized_rabbit_url
    end
  end
end