Class: Tochtli::BaseController::Dispatcher

Inherits:
Object
  • Object
show all
Defined in:
lib/tochtli/base_controller.rb

Defined Under Namespace

Classes: ProcessCounter

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(controller_class, rabbit_connection, cache, logger) ⇒ Dispatcher

Returns a new instance of Dispatcher.



196
197
198
199
200
201
202
203
204
205
# File 'lib/tochtli/base_controller.rb', line 196

def initialize(controller_class, rabbit_connection, cache, logger)
  @controller_class  = controller_class
  @rabbit_connection = rabbit_connection
  @cache             = cache
  @logger            = logger
  @application       = Tochtli.application.to_app
  @queues            = {}
  @process_counter   = ProcessCounter.new
  @initial_env       = nil
end

Instance Attribute Details

#cacheObject (readonly)

Returns the value of attribute cache.



194
195
196
# File 'lib/tochtli/base_controller.rb', line 194

def cache
  @cache
end

#controller_classObject (readonly)

Returns the value of attribute controller_class.



194
195
196
# File 'lib/tochtli/base_controller.rb', line 194

def controller_class
  @controller_class
end

#loggerObject (readonly)

Returns the value of attribute logger.



194
195
196
# File 'lib/tochtli/base_controller.rb', line 194

def logger
  @logger
end

#rabbit_connectionObject (readonly)

Returns the value of attribute rabbit_connection.



194
195
196
# File 'lib/tochtli/base_controller.rb', line 194

def rabbit_connection
  @rabbit_connection
end

Instance Method Details

#process_message(delivery_info, properties, payload, initial_env) ⇒ Object



221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
# File 'lib/tochtli/base_controller.rb', line 221

def process_message(delivery_info, properties, payload, initial_env)
  register_process_start

  env = initial_env.merge(
      delivery_info:     delivery_info,
      properties:        properties,
      payload:           payload,
      controller_class:  controller_class,
      rabbit_connection: rabbit_connection,
      cache:             cache,
      logger:            logger
  )

  @application.call(env)

rescue Exception => ex
  logger.error "\nUNEXPECTED EXCEPTION: #{ex.class.name} (#{ex.message})"
  logger.error ex.backtrace.join("\n")
  false
ensure
  register_process_end
end

#queuesObject



280
281
282
# File 'lib/tochtli/base_controller.rb', line 280

def queues
  @queues.map { |_, qh| qh[:queue] }
end

#restart(options = {}) ⇒ Object



211
212
213
214
215
216
217
218
219
# File 'lib/tochtli/base_controller.rb', line 211

def restart(options={})
 queues = @queues.dup

 shutdown options

 queues.each do |queue_name, queue_opts|
  start queue_name, queue_opts[:initial_env]
 end
end

#shutdown(options = {}) ⇒ Object

Performs a graceful shutdown of dispatcher i.e. waits for all processes to end. If timeout is reached, forces the shutdown. Useful with dynamic reconfiguration of work pool size.



259
260
261
262
# File 'lib/tochtli/base_controller.rb', line 259

def shutdown(options={})
  wait_for_processes options.fetch(:timeout, 15)
  stop
end

#start(queue_name, routing_keys, initial_env = {}) ⇒ Object



207
208
209
# File 'lib/tochtli/base_controller.rb', line 207

def start(queue_name, routing_keys, initial_env={})
  subscribe_queue(queue_name, routing_keys, initial_env)
end

#started?(queue_name = nil) ⇒ Boolean

Returns:

  • (Boolean)


272
273
274
275
276
277
278
# File 'lib/tochtli/base_controller.rb', line 272

def started?(queue_name=nil)
 if queue_name
  @queues.has_key?(queue_name)
 else
  !@queues.empty?
 end
end

#stop(queues = nil) ⇒ Object



264
265
266
267
268
269
270
# File 'lib/tochtli/base_controller.rb', line 264

def stop(queues=nil)
  @queues.each_value { |queue_opts| queue_opts[:consumer].cancel }
rescue Bunny::ConnectionClosedError
  # ignore closed connection error
ensure
 @queues = {}
end

#subscribe_queue(queue_name, routing_keys, initial_env = {}) ⇒ Object



244
245
246
247
248
249
250
251
252
253
254
255
# File 'lib/tochtli/base_controller.rb', line 244

def subscribe_queue(queue_name, routing_keys, initial_env={})
  queue    = controller_class.create_queue(@rabbit_connection, queue_name, routing_keys)
  consumer = queue.subscribe do |delivery_info, , payload|
    process_message delivery_info, , payload, initial_env
  end

  @queues[queue_name] = {
      queue:       queue,
      consumer:    consumer,
      initial_env: initial_env
  }
end