Class: BugBunny::Rabbit
- Inherits:
-
Object
- Object
- BugBunny::Rabbit
- Includes:
- ActiveModel::Attributes, ActiveModel::Model
- Defined in:
- lib/bug_bunny/rabbit.rb
Constant Summary collapse
- DEFAULT_EXCHANGE_OPTIONS =
{ durable: false, auto_delete: false }.freeze
- DEFAULT_QUEUE_OPTIONS =
DEFAULT_MAX_PRIORITY = 10
{ exclusive: false, durable: false, auto_delete: true, # arguments: { 'x-max-priority' => DEFAULT_MAX_PRIORITY } }.freeze
Instance Attribute Summary collapse
-
#connection ⇒ Object
Returns the value of attribute connection.
-
#exchange ⇒ Object
Returns the value of attribute exchange.
-
#queue ⇒ Object
Returns the value of attribute queue.
Class Method Summary collapse
- .create_connection(host: nil, username: nil, password: nil, vhost: nil) ⇒ Object
- .run_consumer(connection:, exchange:, exchange_type:, queue_name:, routing_key:, queue_opts: {}) ⇒ Object
- .start_health_check(app, queue_name:, exchange_name:, exchange_type:) ⇒ Object
Instance Method Summary collapse
- #build_exchange(name: nil, type: 'direct', opts: {}) ⇒ Object
- #build_queue(name: '', opts: {}) ⇒ Object
-
#build_response(status:, body:) ⇒ Object
El success y el error es para tener compatibilidad con el bug_bunny.
- #channel ⇒ Object
- #consume! ⇒ Object
- #default_publish_options ⇒ Object
- #parse_route(route) ⇒ Object
- #publish!(msg, opts) ⇒ Object
- #publish_and_consume!(msg, opts) ⇒ Object
Instance Attribute Details
#connection ⇒ Object
Returns the value of attribute connection.
37 38 39 |
# File 'lib/bug_bunny/rabbit.rb', line 37 def connection @connection end |
#exchange ⇒ Object
Returns the value of attribute exchange.
37 38 39 |
# File 'lib/bug_bunny/rabbit.rb', line 37 def exchange @exchange end |
#queue ⇒ Object
Returns the value of attribute queue.
37 38 39 |
# File 'lib/bug_bunny/rabbit.rb', line 37 def queue @queue end |
Class Method Details
.create_connection(host: nil, username: nil, password: nil, vhost: nil) ⇒ Object
289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 |
# File 'lib/bug_bunny/rabbit.rb', line 289 def self.create_connection(host: nil, username: nil, password: nil, vhost: nil) bunny = Bunny.new( host: host || BugBunny.configuration.host, username: username || BugBunny.configuration.username, password: password || BugBunny.configuration.password, vhost: vhost || BugBunny.configuration.vhost, logger: BugBunny.configuration.logger || Rails.logger, automatically_recover: BugBunny.configuration.automatically_recover || false, network_recovery_interval: BugBunny.configuration.network_recovery_interval || 5, connection_timeout: BugBunny.configuration.connection_timeout || 10, read_timeout: BugBunny.configuration.read_timeout || 90, write_timeout: BugBunny.configuration.write_timeout || 90, heartbeat: BugBunny.configuration.heartbeat || 30, continuation_timeout: BugBunny.configuration.continuation_timeout || 15_000 ) bunny.tap(&:start) rescue Timeout::Error, Bunny::ConnectionError => e # Timeout::Error (para el timeout de conexión TCP) se captura separadamente. # Bunny::ConnectionError cubre TCPConnectionFailed, AuthenticationFailure, AccessRefused, etc. raise BugBunny::Error::Connection, e. end |
.run_consumer(connection:, exchange:, exchange_type:, queue_name:, routing_key:, queue_opts: {}) ⇒ Object
249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 |
# File 'lib/bug_bunny/rabbit.rb', line 249 def self.run_consumer(connection:, exchange:, exchange_type:, queue_name:, routing_key:, queue_opts: {}) app = new(connection: connection) app.build_exchange(name: exchange, type: exchange_type) app.build_queue(name: queue_name, opts: queue_opts) app.queue.bind(app.exchange, routing_key: routing_key) app.consume! health_check_thread = start_health_check(app, queue_name: queue_name, exchange_name: exchange, exchange_type: exchange_type) health_check_thread.wait_for_termination raise 'Health check error: Forcing reconnect.' rescue StandardError => e # Esto lo pongo por que si levanto el rabbit y el consumer a la vez # El rabbit esta una banda de tiempo hasta aceptar conexiones, por lo que # el consumer explota 2 millones de veces, por lo tanto con esto hago # la espera ocupada y me evito de ponerlo en el entrypoint-docker Rails.logger.error("[RABBIT] Consumer error: #{e.} (#{e.class})") Rails.logger.debug("[RABBIT] Consumer error: #{e.backtrace}") connection&.close sleep RABBIT_NETWORK_RECOVERY retry end |
.start_health_check(app, queue_name:, exchange_name:, exchange_type:) ⇒ Object
270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 |
# File 'lib/bug_bunny/rabbit.rb', line 270 def self.start_health_check(app, queue_name:, exchange_name:, exchange_type:) task = Concurrent::TimerTask.new(execution_interval: RABBIT_HEALT_CHECK) do # con esto veo si el exachange o la cola no la borraron desde la vista de rabbit app.channel.exchange_declare(exchange_name, exchange_type, passive: true) app.channel.queue_declare(queue_name, passive: true) rescue Bunny::NotFound Rails.logger.error("Health check failed: Queue '#{queue_name}' no longer exists!") app.connection.close task.shutdown # Detenemos la tarea para que no se ejecute de nuevo rescue StandardError => e Rails.logger.error("Health check error: #{e.}. Forcing reconnect.") app.connection.close task.shutdown end task.execute task end |
Instance Method Details
#build_exchange(name: nil, type: 'direct', opts: {}) ⇒ Object
53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 |
# File 'lib/bug_bunny/rabbit.rb', line 53 def build_exchange(name: nil, type: 'direct', opts: {}) return @exchange if defined?(@exchange) = DEFAULT_EXCHANGE_OPTIONS.merge(opts.compact) if name.blank? @exchange = channel.default_exchange return @exchange end Rails.logger.info("ExchangeName: #{name}, ExchangeType: #{type}, opts: #{opts}") @exchange = case type.to_sym when :topic channel.topic(name, ) when :direct channel.direct(name, ) when :fanout channel.fanout(name, ) when :headers channel.headers(name, ) end end |
#build_queue(name: '', opts: {}) ⇒ Object
90 91 92 93 94 95 |
# File 'lib/bug_bunny/rabbit.rb', line 90 def build_queue(name: '', opts: {}) name = name.to_s = DEFAULT_QUEUE_OPTIONS.merge(opts.compact) Rails.logger.info("QueueName: #{name}, opts: #{}") @queue = channel.queue(name, ) end |
#build_response(status:, body:) ⇒ Object
El success y el error es para tener compatibilidad con el bug_bunny
233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 |
# File 'lib/bug_bunny/rabbit.rb', line 233 def build_response(status:, body:) case status when 'success' then body # Old compatibility when 'error' then raise BugBunny::ResponseError::InternalServerError, body # Old compatibility when 200, 201 then body when 204 then nil when 400 then raise BugBunny::ResponseError::BadRequest, body.to_json when 404 then raise BugBunny::ResponseError::NotFound when 406 then raise BugBunny::ResponseError::NotAcceptable when 422 then raise BugBunny::ResponseError::UnprocessableEntity, body.to_json when 500 then raise BugBunny::ResponseError::InternalServerError, body.to_json else raise BugBunny::ResponseError::Base, body.to_json end end |
#channel ⇒ Object
39 40 41 42 43 44 45 46 47 48 49 50 51 |
# File 'lib/bug_bunny/rabbit.rb', line 39 def channel @channel_mutex ||= Mutex.new return @channel if @channel&.open? @channel_mutex.synchronize do return @channel if @channel&.open? @channel = connection.create_channel @channel.confirm_select @channel.prefetch(RABBIT_CHANNEL_PREFETCH) # Limita mensajes concurrentes por consumidor @channel end end |
#consume! ⇒ Object
181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 |
# File 'lib/bug_bunny/rabbit.rb', line 181 def consume! queue.subscribe(manual_ack: true, block: true) do |delivery_info, properties, body| Rails.logger.debug("DeliveryInfo: #{delivery_info}") Rails.logger.debug("Properties: #{properties}") Rails.logger.debug("Body: #{body}") raise StandardError, 'Undefined properties.type' if properties.type.blank? route = parse_route(properties.type) headers = { type: properties.type, controller: route[:controller], action: route[:action], id: route[:id], content_type: properties.content_type, content_encoding: properties.content_encoding, correlation_id: properties.correlation_id } controller = "rabbit/controllers/#{route[:controller]}".camelize.constantize response_payload = controller.call(headers: headers, body: body) Rails.logger.debug("Response: #{response_payload}") if properties.reply_to.present? Rails.logger.info("Sending response to #{properties.reply_to}") # Publicar la respuesta directamente a la cola de respuesta # No se necesita un exchange, se publica a la cola por su nombre channel.default_exchange.publish( response_payload.to_json, routing_key: properties.reply_to, correlation_id: properties.correlation_id ) end channel.ack(delivery_info.delivery_tag) rescue NoMethodError => e # action controller no exist Rails.logger.error(e) channel.reject(delivery_info.delivery_tag, false) rescue NameError => e # Controller no exist Rails.logger.error(e) channel.reject(delivery_info.delivery_tag, false) rescue StandardError => e Rails.logger.error("Error processing message: #{e.} (#{e.class})") # Reject the message and do NOT re-queue it immediately. channel.reject(delivery_info.delivery_tag, false) end end |
#default_publish_options ⇒ Object
77 78 79 80 81 82 83 84 85 86 87 88 |
# File 'lib/bug_bunny/rabbit.rb', line 77 def @default_publish_opts ||= { persistent: false, app_id: Rails.application.class.module_parent_name }.freeze # Solo generamos valores dinámicos por llamada @default_publish_opts.merge( timestamp: Time.current.to_i, correlation_id: SecureRandom.uuid ) end |
#parse_route(route) ⇒ Object
161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 |
# File 'lib/bug_bunny/rabbit.rb', line 161 def parse_route(route) # De momento no resuelve anidado segments = route.split('/') controller_name = segments[0] action_name = 'index' id = nil case segments.length when 2 # Patrón: controller/action (Ej: 'secrets/index', 'swarm/info') action_name = segments[1] when 3 # Patrón: controller/id/action (Ej: 'secrets/123/update', 'services/999/destroy') id = segments[1] action_name = segments[2] end { controller: controller_name, action: action_name, id: id } end |
#publish!(msg, opts) ⇒ Object
97 98 99 100 101 102 103 104 105 106 107 108 109 110 |
# File 'lib/bug_bunny/rabbit.rb', line 97 def publish!(msg, opts) = .merge(opts.compact) msg = msg.instance_of?(Hash) ? msg.to_json : msg.to_s Rails.logger.info("Message: #{msg}") Rails.logger.info("Options: #{}") exchange.publish(msg, ) # channel.wait_for_confirms # Esto solo confirma que el mensaje llego el exchange rescue Bunny::Exception => e Rails.logger.error(e) raise BugBunny::PublishError, e end |
#publish_and_consume!(msg, opts) ⇒ Object
112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 |
# File 'lib/bug_bunny/rabbit.rb', line 112 def publish_and_consume!(msg, opts) = .merge(opts.compact) response_latch = Concurrent::CountDownLatch.new(1) response = nil reply_queue = channel.queue('', exclusive: true, durable: false, auto_delete: true) [:reply_to] = reply_queue.name subscription = reply_queue.subscribe(manual_ack: true, block: false) do |delivery_info, properties, body| Rails.logger.debug("CONSUMER DeliveryInfo: #{delivery_info}") Rails.logger.debug("CONSUMER Properties: #{properties}") Rails.logger.debug("CONSUMER Body: #{body}") if properties.correlation_id == [:correlation_id] response = ActiveSupport::JSON.decode(body).deep_symbolize_keys.with_indifferent_access channel.ack(delivery_info.delivery_tag) response_latch.count_down else Rails.logger.debug('Correlation_id not match') # Si el correlation_id no coincide, rechazamos el mensaje para que RabbitMQ lo maneje channel.reject(delivery_info.delivery_tag, false) end end Rails.logger.debug("PUBLISHER Message: #{msg}") Rails.logger.debug("PUBLISHER Options: #{}") publish!(msg, ) if response_latch.wait(RABBIT_CONNECTION_TIMEOUT) subscription.cancel build_response(status: response[:status], body: response[:body]) else raise "Timeout: No response received within #{RABBIT_CONNECTION_TIMEOUT} seconds." end rescue BugBunny::ResponseError::Base => e subscription&.cancel raise e rescue RuntimeError => e subscription&.cancel Rails.logger.error("[Rabbit] Error in publish_and_consume: #{e.class} - <#{e.}>") raise(BugBunny::ResponseError::RequestTimeout, e.) if e..include?('Timeout') raise BugBunny::ResponseError::InternalServerError, e. rescue StandardError => e subscription&.cancel Rails.logger.error("[Rabbit] Error in publish_and_consume: #{e.class} - <#{e.}>") raise e end |