Class: MQRPC::Agent
- Inherits:
-
Object
- Object
- MQRPC::Agent
- Defined in:
- lib/mqrpc/agent.rb
Overview
TODO: document this class
Constant Summary collapse
- MAXBUF =
20
- MAXMESSAGEWAIT =
MAXBUF * 20
Instance Method Summary collapse
- #close ⇒ Object
-
#flushout(destination) ⇒ Object
handle_new_subscriptions.
-
#handle_message(hdr, msg_body) ⇒ Object
def subscribe_topic.
- #handle_new_subscriptions ⇒ Object
-
#handle_subscriptions ⇒ Object
run.
- #handler=(handler) ⇒ Object
-
#initialize(config) ⇒ Agent
constructor
A new instance of Agent.
-
#run ⇒ Object
def handle_message.
- #sendmsg(destination, msg, &callback) ⇒ Object
- #sendmsg_topic(key, msg) ⇒ Object
-
#start_amqp ⇒ Object
def initialize.
-
#start_receiver ⇒ Object
def start_amqp.
-
#subscribe(name) ⇒ Object
def start_receiver.
-
#subscribe_topic(name) ⇒ Object
def subscribe.
Constructor Details
#initialize(config) ⇒ Agent
Returns a new instance of Agent.
33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 |
# File 'lib/mqrpc/agent.rb', line 33 def initialize(config) Thread::abort_on_exception = true @config = config @handler = self @id = UUID::generate @outbuffer = Hash.new { |h,k| h[k] = [] } @queues = [] @topics = [] @receive_queue = Queue.new @want_subscriptions = Queue.new @slidingwindow = Hash.new do |h,k| MQRPC::logger.info "New sliding window for #{k}" h[k] = SizedThreadSafeHash.new(MAXMESSAGEWAIT) end @mq = nil @message_operations = Hash.new @startup_mutex = Mutex.new @startup_condvar = ConditionVariable.new @amqp_ready = false start_amqp # Wait for our AMQP thread to get going. Mainly, it needs to set # @mq, so we'll block until it's available. @startup_mutex.synchronize do MQRPC::logger.debug "Waiting for @mq ..." @startup_condvar.wait(@startup_mutex) if !@amqp_ready MQRPC::logger.debug "Got it, continuing with #{self.class} init..." end start_receiver end |
Instance Method Details
#close ⇒ Object
265 266 267 |
# File 'lib/mqrpc/agent.rb', line 265 def close EM.stop_event_loop end |
#flushout(destination) ⇒ Object
handle_new_subscriptions
214 215 216 217 218 219 220 |
# File 'lib/mqrpc/agent.rb', line 214 def flushout(destination) msgs = @outbuffer[destination] return if msgs.length == 0 data = msgs.to_json @mq.queue(destination, :durable => true).publish(data, :persistent => true) msgs.clear end |
#handle_message(hdr, msg_body) ⇒ Object
def subscribe_topic
121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 |
# File 'lib/mqrpc/agent.rb', line 121 def (hdr, msg_body) obj = JSON::load(msg_body) if !obj.is_a?(Array) obj = [obj] end queue = hdr.routing_key obj.each do |item| = Message.new_from_data(item) slidingwindow = @slidingwindow[queue] if .respond_to?(:from_queue) slidingwindow = @slidingwindow[.from_queue] end MQRPC::logger.debug "#{Thread.current} Got message #{.class}##{.id} on queue #{queue}" MQRPC::logger.debug "Received message: #{.inspect}" if (.respond_to?(:in_reply_to) and slidingwindow.include?(.in_reply_to)) MQRPC::logger.debug "Got response to #{.in_reply_to}" slidingwindow.delete(.in_reply_to) end name = .class.name.split(":")[-1] func = "#{name}Handler" # Check if we have a specific operation looking for this # message. if (.respond_to?(:in_reply_to) and @message_operations.has_key?(.in_reply_to)) operation = @message_operations[.in_reply_to] operation.call() elsif @handler.respond_to?(func) @handler.send(func, ) do |response| reply_destination = .reply_to response.from_queue = queue sendmsg(reply_destination, response) end # We should allow the message handler to defer acking if they want # For instance, if we want to index things, but only want to ack # things once we actually flush to disk. else $stderr.puts "#{@handler.class.name} does not support #{func}" end # if @handler.respond_to?(func) end hdr.ack end |
#handle_new_subscriptions ⇒ Object
193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 |
# File 'lib/mqrpc/agent.rb', line 193 def handle_new_subscriptions todo = @want_queues - @queues todo.each do |queue| MQRPC::logger.info "Subscribing to queue #{queue}" mq_q = @mq.queue(queue, :durable => true) mq_q.subscribe(:ack => true) { |hdr, msg| @receive_queue << [hdr, msg] } @queues << queue end # todo.each todo = @want_topics - @topics todo.each do |topic| MQRPC::logger.info "Subscribing to topic #{topic}" exchange = @mq.topic(@config.mqexchange) mq_q = @mq.queue("#{@id}-#{topic}", :exclusive => true, :auto_delete => true).bind(exchange, :key => topic) mq_q.subscribe { |hdr, msg| @receive_queue << [hdr, msg] } @topics << topic end # todo.each end |
#handle_subscriptions ⇒ Object
run
171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 |
# File 'lib/mqrpc/agent.rb', line 171 def handle_subscriptions while true do type, name = @want_subscriptions.pop case type when :queue next if @queues.include?(name) MQRPC::logger.info "Subscribing to queue #{name}" mq_q = @mq.queue(name, :durable => true) mq_q.subscribe(:ack => true) { |hdr, msg| @receive_queue << [hdr, msg] } @queues << name when :topic MQRPC::logger.info "Subscribing to topic #{name}" exchange = @mq.topic(@config.mqexchange) mq_q = @mq.queue("#{@id}-#{name}", :exclusive => true, :auto_delete => true).bind(exchange, :key => name) mq_q.subscribe { |hdr, msg| @receive_queue << [hdr, msg] } @topics << name end end end |
#handler=(handler) ⇒ Object
261 262 263 |
# File 'lib/mqrpc/agent.rb', line 261 def handler=(handler) @handler = handler end |
#run ⇒ Object
def handle_message
167 168 169 |
# File 'lib/mqrpc/agent.rb', line 167 def run @amqpthread.join end |
#sendmsg(destination, msg, &callback) ⇒ Object
232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 |
# File 'lib/mqrpc/agent.rb', line 232 def sendmsg(destination, msg, &callback) if (msg.is_a?(RequestMessage) and msg.id == nil) msg.generate_id! end msg. = Time.now.to_f msg.reply_to = @id if msg.is_a?(RequestMessage) MQRPC::logger.debug "Tracking #{msg.class.name}##{msg.id} to #{destination}" @slidingwindow[destination][msg.id] = true end if msg.buffer? @outbuffer[destination] << msg if @outbuffer[destination].length > MAXBUF flushout(destination) end else MQRPC::logger.debug "#{Thread.current} Sending to #{destination}: #{msg.inspect}" @mq.queue(destination, :durable => true).publish([msg].to_json, :persistent => true) end if block_given? op = Operation.new(callback) @message_operations[msg.id] = op return op end end |
#sendmsg_topic(key, msg) ⇒ Object
222 223 224 225 226 227 228 229 230 |
# File 'lib/mqrpc/agent.rb', line 222 def sendmsg_topic(key, msg) if (msg.is_a?(RequestMessage) and msg.id == nil) msg.generate_id! end msg. = Time.now.to_f data = msg.to_json @mq.topic(@config.mqexchange).publish(data, :key => key) end |
#start_amqp ⇒ Object
def initialize
68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 |
# File 'lib/mqrpc/agent.rb', line 68 def start_amqp @amqpthread = Thread.new do # Create connection to AMQP, and in turn, the main EventMachine loop. amqp_config = {:host => @config.mqhost, :port => @config.mqport, :user => @config.mquser, :pass => @config.mqpass, :vhost => @config.mqvhost, } AMQP.start(amqp_config) do @startup_mutex.synchronize do @mq = MQ.new # Notify the main calling thread (MessageSocket#initialize) that # we can continue @amqp_ready = true @startup_condvar.signal end MQRPC::logger.info "Subscribing to main queue #{@id}" mq_q = @mq.queue(@id, :auto_delete => true) mq_q.subscribe(:ack =>true) { |hdr, msg| @receive_queue << [hdr, msg] } #handle_new_subscriptions # TODO(sissel): make this a deferred thread that reads from a Queue #EM.add_periodic_timer(5) { handle_new_subscriptions } EM.defer { handle_subscriptions } EM.add_periodic_timer(1) do # TODO(sissel): add locking @outbuffer.each_key { |dest| flushout(dest) } @outbuffer.clear end end # AMQP.start end end |
#start_receiver ⇒ Object
def start_amqp
104 105 106 107 108 109 110 111 |
# File 'lib/mqrpc/agent.rb', line 104 def start_receiver Thread.new do while true header, = @receive_queue.pop (header, ) end end end |
#subscribe(name) ⇒ Object
def start_receiver
113 114 115 |
# File 'lib/mqrpc/agent.rb', line 113 def subscribe(name) @want_subscriptions << [:queue, name] end |
#subscribe_topic(name) ⇒ Object
def subscribe
117 118 119 |
# File 'lib/mqrpc/agent.rb', line 117 def subscribe_topic(name) @want_subscriptions << [:topic, name] end |