Class: Tochtli::BaseController::Dispatcher
- Inherits:
-
Object
- Object
- Tochtli::BaseController::Dispatcher
- Defined in:
- lib/tochtli/base_controller.rb
Defined Under Namespace
Classes: ProcessCounter
Instance Attribute Summary collapse
-
#cache ⇒ Object
readonly
Returns the value of attribute cache.
-
#controller_class ⇒ Object
readonly
Returns the value of attribute controller_class.
-
#logger ⇒ Object
readonly
Returns the value of attribute logger.
-
#rabbit_connection ⇒ Object
readonly
Returns the value of attribute rabbit_connection.
Instance Method Summary collapse
-
#initialize(controller_class, rabbit_connection, cache, logger) ⇒ Dispatcher
constructor
A new instance of Dispatcher.
- #process_message(delivery_info, properties, payload, initial_env) ⇒ Object
- #queues ⇒ Object
- #restart(options = {}) ⇒ Object
-
#shutdown(options = {}) ⇒ Object
Performs a graceful shutdown of dispatcher i.e.
- #start(queue_name, routing_keys, initial_env = {}) ⇒ Object
- #started?(queue_name = nil) ⇒ Boolean
- #stop(queues = nil) ⇒ Object
- #subscribe_queue(queue_name, routing_keys, initial_env = {}) ⇒ Object
Constructor Details
#initialize(controller_class, rabbit_connection, cache, logger) ⇒ Dispatcher
Returns a new instance of Dispatcher.
196 197 198 199 200 201 202 203 204 205 |
# File 'lib/tochtli/base_controller.rb', line 196 def initialize(controller_class, rabbit_connection, cache, logger) @controller_class = controller_class @rabbit_connection = rabbit_connection @cache = cache @logger = logger @application = Tochtli.application.to_app @queues = {} @process_counter = ProcessCounter.new @initial_env = nil end |
Instance Attribute Details
#cache ⇒ Object (readonly)
Returns the value of attribute cache.
194 195 196 |
# File 'lib/tochtli/base_controller.rb', line 194 def cache @cache end |
#controller_class ⇒ Object (readonly)
Returns the value of attribute controller_class.
194 195 196 |
# File 'lib/tochtli/base_controller.rb', line 194 def controller_class @controller_class end |
#logger ⇒ Object (readonly)
Returns the value of attribute logger.
194 195 196 |
# File 'lib/tochtli/base_controller.rb', line 194 def logger @logger end |
#rabbit_connection ⇒ Object (readonly)
Returns the value of attribute rabbit_connection.
194 195 196 |
# File 'lib/tochtli/base_controller.rb', line 194 def rabbit_connection @rabbit_connection end |
Instance Method Details
#process_message(delivery_info, properties, payload, initial_env) ⇒ Object
221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 |
# File 'lib/tochtli/base_controller.rb', line 221 def (delivery_info, properties, payload, initial_env) register_process_start env = initial_env.merge( delivery_info: delivery_info, properties: properties, payload: payload, controller_class: controller_class, rabbit_connection: rabbit_connection, cache: cache, logger: logger ) @application.call(env) rescue Exception => ex logger.error "\nUNEXPECTED EXCEPTION: #{ex.class.name} (#{ex.})" logger.error ex.backtrace.join("\n") false ensure register_process_end end |
#queues ⇒ Object
280 281 282 |
# File 'lib/tochtli/base_controller.rb', line 280 def queues @queues.map { |_, qh| qh[:queue] } end |
#restart(options = {}) ⇒ Object
211 212 213 214 215 216 217 218 219 |
# File 'lib/tochtli/base_controller.rb', line 211 def restart(={}) queues = @queues.dup shutdown queues.each do |queue_name, queue_opts| start queue_name, queue_opts[:initial_env] end end |
#shutdown(options = {}) ⇒ Object
Performs a graceful shutdown of dispatcher i.e. waits for all processes to end. If timeout is reached, forces the shutdown. Useful with dynamic reconfiguration of work pool size.
259 260 261 262 |
# File 'lib/tochtli/base_controller.rb', line 259 def shutdown(={}) wait_for_processes .fetch(:timeout, 15) stop end |
#start(queue_name, routing_keys, initial_env = {}) ⇒ Object
207 208 209 |
# File 'lib/tochtli/base_controller.rb', line 207 def start(queue_name, routing_keys, initial_env={}) subscribe_queue(queue_name, routing_keys, initial_env) end |
#started?(queue_name = nil) ⇒ Boolean
272 273 274 275 276 277 278 |
# File 'lib/tochtli/base_controller.rb', line 272 def started?(queue_name=nil) if queue_name @queues.has_key?(queue_name) else !@queues.empty? end end |
#stop(queues = nil) ⇒ Object
264 265 266 267 268 269 270 |
# File 'lib/tochtli/base_controller.rb', line 264 def stop(queues=nil) @queues.each_value { |queue_opts| queue_opts[:consumer].cancel } rescue Bunny::ConnectionClosedError # ignore closed connection error ensure @queues = {} end |
#subscribe_queue(queue_name, routing_keys, initial_env = {}) ⇒ Object
244 245 246 247 248 249 250 251 252 253 254 255 |
# File 'lib/tochtli/base_controller.rb', line 244 def subscribe_queue(queue_name, routing_keys, initial_env={}) queue = controller_class.create_queue(@rabbit_connection, queue_name, routing_keys) consumer = queue.subscribe do |delivery_info, , payload| delivery_info, , payload, initial_env end @queues[queue_name] = { queue: queue, consumer: consumer, initial_env: initial_env } end |