Class: MQRPC::Agent
- Inherits:
-
Object
- Object
- MQRPC::Agent
- Defined in:
- lib/mqrpc/agent.rb
Overview
TODO: document this class
Constant Summary collapse
- MAXBUF =
20
- MAXMESSAGEWAIT =
MAXBUF * 20
Class Attribute Summary collapse
-
.message_handlers ⇒ Object
Returns the value of attribute message_handlers.
-
.pipelines ⇒ Object
Returns the value of attribute pipelines.
Class Method Summary collapse
-
.handle(messageclass, method) ⇒ Object
Subclasses use this to declare their support of any given message.
- .pipeline(source, destination) ⇒ Object
Instance Method Summary collapse
-
#can_receive?(message_class) ⇒ Boolean
run.
- #close ⇒ Object
-
#flushout(destination) ⇒ Object
handle_new_subscriptions.
-
#handle_message(hdr, msg_body) ⇒ Object
def subscribe_topic.
- #handle_new_subscriptions ⇒ Object
- #handle_subscriptions ⇒ Object
- #handler=(handler) ⇒ Object
-
#initialize(config) ⇒ Agent
constructor
A new instance of Agent.
-
#run ⇒ Object
def handle_message.
- #sendmsg(destination, msg, &callback) ⇒ Object
- #sendmsg_topic(key, msg) ⇒ Object
-
#start_amqp ⇒ Object
def initialize.
-
#start_receiver ⇒ Object
def start_amqp.
-
#subscribe(name) ⇒ Object
def start_receiver.
-
#subscribe_topic(name) ⇒ Object
def subscribe.
Constructor Details
#initialize(config) ⇒ Agent
Returns a new instance of Agent.
56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 |
# File 'lib/mqrpc/agent.rb', line 56 def initialize(config) Thread::abort_on_exception = true @config = config @handler = self @id = UUID::generate @outbuffer = Hash.new { |h,k| h[k] = [] } @queues = Set.new @topics = Set.new @receive_queue = Queue.new @want_subscriptions = Queue.new # figure out how to really do this correctly, see also def self.pipeline if self.class.pipelines == nil self.class.pipelines = Hash.new end @slidingwindow = Hash.new do |h,k| MQRPC::logger.debug "New sliding window for #{k}" h[k] = SizedThreadSafeHash.new(MAXMESSAGEWAIT) do |state| if self.class.pipelines[k] source = self.class.pipelines[k] MQRPC::logger.debug "Got sizedhash callback for #{k}: #{state}" case state when :blocked MQRPC::logger.info("Queue '#{k}' is full, unsubscribing from #{source}") exchange = @mq.topic(@config.mqexchange, :durable => true) mq_q = @mq.queue(source, :durable => true) mq_q.bind(exchange, :key => "*") mq_q.unsubscribe @queues.delete(source) when :ready MQRPC::logger.info("Queue '#{k}' is ready, resubscribing to #{source}") subscribe(source) end end end end @mq = nil @message_operations = Hash.new @startup_mutex = Mutex.new @startup_condvar = ConditionVariable.new @amqp_ready = false start_amqp # Wait for our AMQP thread to get going. Mainly, it needs to set # @mq, so we'll block until it's available. @startup_mutex.synchronize do MQRPC::logger.debug "Waiting for @mq ..." @startup_condvar.wait(@startup_mutex) if !@amqp_ready MQRPC::logger.debug "Got it, continuing with #{self.class} init..." end start_receiver end |
Class Attribute Details
.message_handlers ⇒ Object
Returns the value of attribute message_handlers.
35 36 37 |
# File 'lib/mqrpc/agent.rb', line 35 def @message_handlers end |
.pipelines ⇒ Object
Returns the value of attribute pipelines.
36 37 38 |
# File 'lib/mqrpc/agent.rb', line 36 def pipelines @pipelines end |
Class Method Details
.handle(messageclass, method) ⇒ Object
Subclasses use this to declare their support of any given message
41 42 43 44 45 46 |
# File 'lib/mqrpc/agent.rb', line 41 def self.handle(, method) if self. == nil self. = Hash.new end self.[] = method end |
.pipeline(source, destination) ⇒ Object
48 49 50 51 52 53 54 |
# File 'lib/mqrpc/agent.rb', line 48 def self.pipeline(source, destination) if self.pipelines == nil self.pipelines = Hash.new end self.pipelines[destination] = source end |
Instance Method Details
#can_receive?(message_class) ⇒ Boolean
run
228 229 230 |
# File 'lib/mqrpc/agent.rb', line 228 def can_receive?() return self.class..include?() end |
#close ⇒ Object
328 329 330 |
# File 'lib/mqrpc/agent.rb', line 328 def close EM.stop_event_loop end |
#flushout(destination) ⇒ Object
handle_new_subscriptions
277 278 279 280 281 282 283 |
# File 'lib/mqrpc/agent.rb', line 277 def flushout(destination) msgs = @outbuffer[destination] return if msgs.length == 0 data = msgs.to_json @mq.queue(destination, :durable => true).publish(data, :persistent => true) msgs.clear end |
#handle_message(hdr, msg_body) ⇒ Object
def subscribe_topic
165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 |
# File 'lib/mqrpc/agent.rb', line 165 def (hdr, msg_body) queue = hdr.routing_key # If we just unsubscribed from a queue, we may still have some # messages buffered, so reject the message. # Currently RabbitMQ doesn't support message rejection, so let's # ack the message then push it back into the queue, unmodified. if !@queues.include?(queue) MQRPC::logger.warn("Got message on queue #{queue} that we are not "\ "subscribed to; rejecting") hdr.ack @mq.queue(queue, :durable => true).publish(msg_body, :persistent => true) return end # TODO(sissel): handle any exceptions we might get from here. obj = JSON::load(msg_body) if !obj.is_a?(Array) obj = [obj] end obj.each do |item| = Message.new_from_data(item) slidingwindow = @slidingwindow[queue] if .respond_to?(:from_queue) slidingwindow = @slidingwindow[.from_queue] end MQRPC::logger.debug "Got message #{.class}##{.id} on queue #{queue}" #MQRPC::logger.debug "Received message: #{message.inspect}" if (.respond_to?(:in_reply_to) and slidingwindow.include?(.in_reply_to)) MQRPC::logger.debug "Got response to #{.in_reply_to}" slidingwindow.delete(.in_reply_to) end # Check if we have a specific operation looking for this # message. if (.respond_to?(:in_reply_to) and @message_operations.has_key?(.in_reply_to)) operation = @message_operations[.in_reply_to] operation.call() elsif can_receive?(.class) func = self.class.[.class] self.send(func, ) do |response| reply_destination = .reply_to response.from_queue = queue sendmsg(reply_destination, response) end # TODO(sissel): We should allow the message handler to defer acking # if they want For instance, if we want to index things, but only # want to ack things once we actually flush to disk. else $stderr.puts "#{@handler.class.name} does not support #{.class}" end end hdr.ack end |
#handle_new_subscriptions ⇒ Object
256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 |
# File 'lib/mqrpc/agent.rb', line 256 def handle_new_subscriptions todo = @want_queues - @queues todo.each do |queue| MQRPC::logger.info "Subscribing to queue #{queue}" mq_q = @mq.queue(queue, :durable => true) mq_q.subscribe(:ack => true) { |hdr, msg| @receive_queue << [hdr, msg] } @queues << queue end # todo.each todo = @want_topics - @topics todo.each do |topic| MQRPC::logger.info "Subscribing to topic #{topic}" exchange = @mq.topic(@config.mqexchange) mq_q = @mq.queue("#{@id}-#{topic}", :exclusive => true, :auto_delete => true).bind(exchange, :key => topic) mq_q.subscribe { |hdr, msg| @receive_queue << [hdr, msg] } @topics << topic end # todo.each end |
#handle_subscriptions ⇒ Object
232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 |
# File 'lib/mqrpc/agent.rb', line 232 def handle_subscriptions while true do type, name = @want_subscriptions.pop case type when :queue next if @queues.include?(name) MQRPC::logger.info "Subscribing to queue #{name}" exchange = @mq.topic(@config.mqexchange, :durable => true) mq_q = @mq.queue(name, :durable => true) mq_q.bind(exchange, :key => "*") mq_q.subscribe(:ack => true) { |hdr, msg| @receive_queue << [hdr, msg] } @queues << name when :topic MQRPC::logger.info "Subscribing to topic #{name}" exchange = @mq.topic(@config.mqexchange, :durable => true) mq_q = @mq.queue("#{@id}-#{name}", :exclusive => true, :auto_delete => true).bind(exchange, :key => name) mq_q.subscribe { |hdr, msg| @receive_queue << [hdr, msg] } @topics << name end end end |
#handler=(handler) ⇒ Object
324 325 326 |
# File 'lib/mqrpc/agent.rb', line 324 def handler=(handler) @handler = handler end |
#run ⇒ Object
def handle_message
224 225 226 |
# File 'lib/mqrpc/agent.rb', line 224 def run @amqpthread.join end |
#sendmsg(destination, msg, &callback) ⇒ Object
295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 |
# File 'lib/mqrpc/agent.rb', line 295 def sendmsg(destination, msg, &callback) if (msg.is_a?(RequestMessage) and msg.id == nil) msg.generate_id! end msg. = Time.now.to_f msg.reply_to = @id if msg.is_a?(RequestMessage) MQRPC::logger.debug "Tracking #{msg.class.name}##{msg.id} to #{destination}" @slidingwindow[destination][msg.id] = true end if msg.delayable @outbuffer[destination] << msg if @outbuffer[destination].length > MAXBUF flushout(destination) end else MQRPC::logger.debug "Sending to #{destination}: #{msg.inspect}" @mq.queue(destination, :durable => true).publish([msg].to_json, :persistent => true) end if block_given? op = Operation.new(callback) @message_operations[msg.id] = op return op end end |
#sendmsg_topic(key, msg) ⇒ Object
285 286 287 288 289 290 291 292 293 |
# File 'lib/mqrpc/agent.rb', line 285 def sendmsg_topic(key, msg) if (msg.is_a?(RequestMessage) and msg.id == nil) msg.generate_id! end msg. = Time.now.to_f data = msg.to_json @mq.topic(@config.mqexchange).publish(data, :key => key) end |
#start_amqp ⇒ Object
def initialize
114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 |
# File 'lib/mqrpc/agent.rb', line 114 def start_amqp @amqpthread = Thread.new do # Create connection to AMQP, and in turn, the main EventMachine loop. amqp_config = {:host => @config.mqhost, :port => @config.mqport, :user => @config.mquser, :pass => @config.mqpass, :vhost => @config.mqvhost, } AMQP.start(amqp_config) do @startup_mutex.synchronize do @mq = MQ.new # Notify the main calling thread (MessageSocket#initialize) that # we can continue @amqp_ready = true @startup_condvar.signal end MQRPC::logger.info "Subscribing to main queue #{@id}" subscribe(@id) # TODO(sissel): make this a deferred thread that reads from a Queue #EM.add_periodic_timer(5) { handle_new_subscriptions } EM.defer { handle_subscriptions } EM.add_periodic_timer(1) do # TODO(sissel): add locking @outbuffer.each_key { |dest| flushout(dest) } @outbuffer.clear end end # AMQP.start end end |
#start_receiver ⇒ Object
def start_amqp
148 149 150 151 152 153 154 155 |
# File 'lib/mqrpc/agent.rb', line 148 def start_receiver Thread.new do while true header, = @receive_queue.pop (header, ) end end end |
#subscribe(name) ⇒ Object
def start_receiver
157 158 159 |
# File 'lib/mqrpc/agent.rb', line 157 def subscribe(name) @want_subscriptions << [:queue, name] end |
#subscribe_topic(name) ⇒ Object
def subscribe
161 162 163 |
# File 'lib/mqrpc/agent.rb', line 161 def subscribe_topic(name) @want_subscriptions << [:topic, name] end |