Class: Tochtli::BaseController

Inherits:
Object
  • Object
show all
Extended by:
Uber::InheritableAttribute
Includes:
Hooks, Hooks::InstanceHooks
Defined in:
lib/tochtli/base_controller.rb

Defined Under Namespace

Classes: Dispatcher, KeyPattern, MessageRoute

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(rabbit_connection, cache, logger) ⇒ BaseController

Returns a new instance of BaseController.



146
147
148
149
150
# File 'lib/tochtli/base_controller.rb', line 146

def initialize(rabbit_connection, cache, logger)
  @rabbit_connection = rabbit_connection
  @cache             = cache
  @logger            = logger
end

Instance Attribute Details

#delivery_infoObject (readonly)

Returns the value of attribute delivery_info.



15
16
17
# File 'lib/tochtli/base_controller.rb', line 15

def delivery_info
  @delivery_info
end

#envObject (readonly)

Returns the value of attribute env.



15
16
17
# File 'lib/tochtli/base_controller.rb', line 15

def env
  @env
end

#loggerObject (readonly)

Returns the value of attribute logger.



15
16
17
# File 'lib/tochtli/base_controller.rb', line 15

def logger
  @logger
end

#messageObject (readonly)

Returns the value of attribute message.



15
16
17
# File 'lib/tochtli/base_controller.rb', line 15

def message
  @message
end

Class Method Details

.bind(*routing_keys) ⇒ Object



54
55
56
# File 'lib/tochtli/base_controller.rb', line 54

def bind(*routing_keys)
  self.routing_keys.merge(routing_keys)
end

.create_queue(rabbit_connection, queue_name = nil, routing_keys = nil) ⇒ Object



125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
# File 'lib/tochtli/base_controller.rb', line 125

def create_queue(rabbit_connection, queue_name=nil, routing_keys=nil)
  queue_name    = self.queue_name unless queue_name
  routing_keys  = self.routing_keys unless routing_keys
  channel       = rabbit_connection.create_channel(self.work_pool_size)
  exchange_name = self.exchange_name || rabbit_connection.exchange_name
  exchange      = channel.send(self.exchange_type, exchange_name, durable: self.exchange_durable)
  queue         = channel.queue(queue_name,
                                durable:     self.queue_durable,
                                exclusive:   self.queue_exclusive,
                                auto_delete: self.queue_auto_delete)

  routing_keys.each do |routing_key|
    queue.bind(exchange, routing_key: routing_key)
  end

  queue
end

.find_message_route(routing_key) ⇒ Object



120
121
122
123
# File 'lib/tochtli/base_controller.rb', line 120

def find_message_route(routing_key)
  raise "Routing not set up" if self.message_handlers.empty?
  self.message_handlers.find {|handler| handler.pattern =~ routing_key }
end

.inherited(controller) ⇒ Object



47
48
49
50
51
52
# File 'lib/tochtli/base_controller.rb', line 47

def inherited(controller)
  controller.routing_keys     = Set.new
  controller.message_handlers = Array.new
  controller.queue_name       = controller.name.underscore.gsub('::', '/')
  ControllerManager.register(controller)
end

.off(routing_key) ⇒ Object



74
75
76
# File 'lib/tochtli/base_controller.rb', line 74

def off(routing_key)
  self.message_handlers.delete_if {|route| route.routing_key == routing_key }
end

.on(message_class, method_name = nil, opts = {}, &block) ⇒ Object

Raises:

  • (ArgumentError)


58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
# File 'lib/tochtli/base_controller.rb', line 58

def on(message_class, method_name=nil, opts={}, &block)
  if method_name.is_a?(Hash)
    opts        = method_name
    method_name = nil
  end
  method = method_name ? method_name : block
  raise ArgumentError, "Method name or block must be given" unless method

  raise ArgumentError, "Message class expected, got: #{message_class}" unless message_class < Tochtli::Message

  routing_key = opts[:routing_key] || message_class.routing_key
  raise "Topic not set for message: #{message_class}" unless routing_key

  self.message_handlers << MessageRoute.new(message_class, method, routing_key)
end

.restart(options = {}) ⇒ Object



111
112
113
114
115
116
117
118
# File 'lib/tochtli/base_controller.rb', line 111

def restart(options={})
 if started?
  queues = self.dispatcher.queues
  run_hook :before_restart, queues
  self.dispatcher.restart options
  run_hook :after_restart, queues
				end
end

.set_up?Boolean

Returns:

  • (Boolean)


91
92
93
# File 'lib/tochtli/base_controller.rb', line 91

def set_up?
  !!self.dispatcher
end

.setup(rabbit_connection, cache = nil, logger = nil) ⇒ Object



79
80
81
82
83
# File 'lib/tochtli/base_controller.rb', line 79

def setup(rabbit_connection, cache=nil, logger=nil)
  run_hook :before_setup, rabbit_connection
  self.dispatcher = Dispatcher.new(self, rabbit_connection, cache, logger || Tochtli.logger)
  run_hook :after_setup, rabbit_connection
end

.start(queue_name = nil, routing_keys = nil, initial_env = {}) ⇒ Object



85
86
87
88
89
# File 'lib/tochtli/base_controller.rb', line 85

def start(queue_name=nil, routing_keys=nil, initial_env={})
				run_hook :before_start, queue_name, initial_env
  self.dispatcher.start(queue_name || self.queue_name, routing_keys || self.routing_keys, initial_env)
				run_hook :after_start, queue_name, initial_env
end

.started?(queue_name = nil) ⇒ Boolean

Returns:

  • (Boolean)


95
96
97
# File 'lib/tochtli/base_controller.rb', line 95

def started?(queue_name=nil)
  self.dispatcher && self.dispatcher.started?(queue_name)
end

.stop(options = {}) ⇒ Object



99
100
101
102
103
104
105
106
107
108
109
# File 'lib/tochtli/base_controller.rb', line 99

def stop(options={})
 if started?
					queues = self.dispatcher.queues
  run_hook :before_stop, queues

					self.dispatcher.shutdown(options)
   self.dispatcher = nil

					run_hook :after_stop, queues
				end
end

Instance Method Details

#process_message(env) ⇒ Object



152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
# File 'lib/tochtli/base_controller.rb', line 152

def process_message(env)
  @env           = env
  @action        = env[:action]
  @message       = env[:message]
  @delivery_info = env[:delivery_info]

  if @action.is_a?(Proc)
    instance_eval(&@action)
  else
    send @action
  end

ensure
  @env, @message, @delivery_info = nil
end

#rabbit_connectionObject



183
184
185
# File 'lib/tochtli/base_controller.rb', line 183

def rabbit_connection
 self.class.dispatcher.rabbit_connection if self.class.set_up?
end

#reply(reply_message, reply_to = nil, message_id = nil) ⇒ Object



168
169
170
171
172
173
174
175
176
177
178
179
180
181
# File 'lib/tochtli/base_controller.rb', line 168

def reply(reply_message, reply_to=nil, message_id=nil)
  if @message
    reply_to   ||= @message.properties.reply_to
    message_id ||= @message.id
  end

  raise "The 'reply_to' queue name is not specified" unless reply_to

  logger.debug "\tSending  reply on #{message_id} to #{reply_to}: #{reply_message.inspect}."

  @rabbit_connection.publish(reply_to,
                             reply_message,
                             correlation_id: message_id)
end