Class: MQRPC::Agent

Inherits:
Object
  • Object
show all
Defined in:
lib/mqrpc/agent.rb

Overview

TODO: document this class

Constant Summary collapse

MAXBUF =
20
MAXMESSAGEWAIT =
MAXBUF * 20

Class Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(config) ⇒ Agent

Returns a new instance of Agent.



56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
# File 'lib/mqrpc/agent.rb', line 56

def initialize(config)
  Thread::abort_on_exception = true
  @config = config
  @handler = self
  @id = UUID::generate
  @outbuffer = Hash.new { |h,k| h[k] = [] }
  @queues = Set.new
  @topics = Set.new
  @receive_queue = Queue.new
  @want_subscriptions = Queue.new

  # figure out how to really do this correctly, see also def self.pipeline
  if self.class.pipelines == nil
    self.class.pipelines = Hash.new
  end

  @slidingwindow = Hash.new do |h,k| 
    MQRPC::logger.debug "New sliding window for #{k}"
    h[k] = SizedThreadSafeHash.new(MAXMESSAGEWAIT) do |state|
      if self.class.pipelines[k]
        source = self.class.pipelines[k]
        MQRPC::logger.debug "Got sizedhash callback for #{k}: #{state}"
        case state
        when :blocked
          MQRPC::logger.info("Queue '#{k}' is full, unsubscribing from #{source}")
          exchange = @mq.topic(@config.mqexchange, :durable => true)
          mq_q = @mq.queue(source, :durable => true)
          mq_q.bind(exchange, :key => "*")
          mq_q.unsubscribe
          @queues.delete(source)
        when :ready
          MQRPC::logger.info("Queue '#{k}' is ready, resubscribing to #{source}")
          subscribe(source)
        end
      end
    end
  end

  @mq = nil
  @message_operations = Hash.new

  @startup_mutex = Mutex.new
  @startup_condvar = ConditionVariable.new
  @amqp_ready = false

  start_amqp

  # Wait for our AMQP thread to get going. Mainly, it needs to set
  # @mq, so we'll block until it's available.
  @startup_mutex.synchronize do
    MQRPC::logger.debug "Waiting for @mq ..."
    @startup_condvar.wait(@startup_mutex) if !@amqp_ready
    MQRPC::logger.debug "Got it, continuing with #{self.class} init..."
  end

  start_receiver
end

Class Attribute Details

.message_handlersObject

Returns the value of attribute message_handlers.



35
36
37
# File 'lib/mqrpc/agent.rb', line 35

def message_handlers
  @message_handlers
end

.pipelinesObject

Returns the value of attribute pipelines.



36
37
38
# File 'lib/mqrpc/agent.rb', line 36

def pipelines
  @pipelines
end

Class Method Details

.handle(messageclass, method) ⇒ Object

Subclasses use this to declare their support of any given message



41
42
43
44
45
46
# File 'lib/mqrpc/agent.rb', line 41

def self.handle(messageclass, method)
  if self.message_handlers == nil
    self.message_handlers = Hash.new
  end
  self.message_handlers[messageclass] = method
end

.pipeline(source, destination) ⇒ Object



48
49
50
51
52
53
54
# File 'lib/mqrpc/agent.rb', line 48

def self.pipeline(source, destination)
  if self.pipelines == nil
    self.pipelines = Hash.new
  end

  self.pipelines[destination] = source
end

Instance Method Details

#can_receive?(message_class) ⇒ Boolean

run

Returns:

  • (Boolean)


228
229
230
# File 'lib/mqrpc/agent.rb', line 228

def can_receive?(message_class)
  return self.class.message_handlers.include?(message_class)
end

#closeObject



328
329
330
# File 'lib/mqrpc/agent.rb', line 328

def close
  EM.stop_event_loop
end

#flushout(destination) ⇒ Object

handle_new_subscriptions



277
278
279
280
281
282
283
# File 'lib/mqrpc/agent.rb', line 277

def flushout(destination)
  msgs = @outbuffer[destination]
  return if msgs.length == 0
  data = msgs.to_json
  @mq.queue(destination, :durable => true).publish(data, :persistent => true)
  msgs.clear
end

#handle_message(hdr, msg_body) ⇒ Object

def subscribe_topic



165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
# File 'lib/mqrpc/agent.rb', line 165

def handle_message(hdr, msg_body)
  queue = hdr.routing_key

  # If we just unsubscribed from a queue, we may still have some
  # messages buffered, so reject the message.
  # Currently RabbitMQ doesn't support message rejection, so let's
  # ack the message then push it back into the queue, unmodified.
  if !@queues.include?(queue)
    MQRPC::logger.warn("Got message on queue #{queue} that we are not "\
                       "subscribed to; rejecting")
    hdr.ack
    @mq.queue(queue, :durable => true).publish(msg_body, :persistent => true)
    return
  end

  # TODO(sissel): handle any exceptions we might get from here.
  obj = JSON::load(msg_body)
  if !obj.is_a?(Array)
    obj = [obj]
  end

  obj.each do |item|
    message = Message.new_from_data(item)
    slidingwindow = @slidingwindow[queue]
    if message.respond_to?(:from_queue)
      slidingwindow = @slidingwindow[message.from_queue]
    end
    MQRPC::logger.debug "Got message #{message.class}##{message.id} on queue #{queue}"
    #MQRPC::logger.debug "Received message: #{message.inspect}"
    if (message.respond_to?(:in_reply_to) and 
        slidingwindow.include?(message.in_reply_to))
      MQRPC::logger.debug "Got response to #{message.in_reply_to}"
      slidingwindow.delete(message.in_reply_to)
    end

    # Check if we have a specific operation looking for this
    # message.
    if (message.respond_to?(:in_reply_to) and
        @message_operations.has_key?(message.in_reply_to))
      operation = @message_operations[message.in_reply_to]
      operation.call(message)
    elsif can_receive?(message.class)
      func = self.class.message_handlers[message.class]
      self.send(func, message) do |response|
        reply_destination = message.reply_to
        response.from_queue = queue
        sendmsg(reply_destination, response)
      end

      # TODO(sissel): We should allow the message handler to defer acking
      # if they want For instance, if we want to index things, but only
      # want to ack things once we actually flush to disk.
    else
      $stderr.puts "#{@handler.class.name} does not support #{message.class}"
    end 
  end
  hdr.ack
end

#handle_new_subscriptionsObject



256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
# File 'lib/mqrpc/agent.rb', line 256

def handle_new_subscriptions
  todo = @want_queues - @queues
  todo.each do |queue|
    MQRPC::logger.info "Subscribing to queue #{queue}"
    mq_q = @mq.queue(queue, :durable => true)
    mq_q.subscribe(:ack => true) { |hdr, msg| @receive_queue << [hdr, msg] }
    @queues << queue
  end # todo.each

  todo = @want_topics - @topics
  todo.each do |topic|
    MQRPC::logger.info "Subscribing to topic #{topic}"
    exchange = @mq.topic(@config.mqexchange)
    mq_q = @mq.queue("#{@id}-#{topic}",
                     :exclusive => true,
                     :auto_delete => true).bind(exchange, :key => topic)
    mq_q.subscribe { |hdr, msg| @receive_queue << [hdr, msg] }
    @topics << topic
  end # todo.each
end

#handle_subscriptionsObject



232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
# File 'lib/mqrpc/agent.rb', line 232

def handle_subscriptions
  while true do
    type, name = @want_subscriptions.pop
    case type
    when :queue
      next if @queues.include?(name)
      MQRPC::logger.info "Subscribing to queue #{name}"
      exchange = @mq.topic(@config.mqexchange, :durable => true)
      mq_q = @mq.queue(name, :durable => true)
      mq_q.bind(exchange, :key => "*")
      mq_q.subscribe(:ack => true) { |hdr, msg| @receive_queue << [hdr, msg] }
      @queues << name
    when :topic
      MQRPC::logger.info "Subscribing to topic #{name}"
      exchange = @mq.topic(@config.mqexchange, :durable => true)
      mq_q = @mq.queue("#{@id}-#{name}",
                       :exclusive => true,
                       :auto_delete => true).bind(exchange, :key => name)
      mq_q.subscribe { |hdr, msg| @receive_queue << [hdr, msg] }
      @topics << name
    end
  end
end

#handler=(handler) ⇒ Object



324
325
326
# File 'lib/mqrpc/agent.rb', line 324

def handler=(handler)
  @handler = handler
end

#runObject

def handle_message



224
225
226
# File 'lib/mqrpc/agent.rb', line 224

def run
  @amqpthread.join
end

#sendmsg(destination, msg, &callback) ⇒ Object



295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
# File 'lib/mqrpc/agent.rb', line 295

def sendmsg(destination, msg, &callback)
  if (msg.is_a?(RequestMessage) and msg.id == nil)
    msg.generate_id!
  end
  msg.timestamp = Time.now.to_f
  msg.reply_to = @id

  if msg.is_a?(RequestMessage)
    MQRPC::logger.debug "Tracking #{msg.class.name}##{msg.id} to #{destination}"
    @slidingwindow[destination][msg.id] = true
  end

  if msg.delayable
    @outbuffer[destination] << msg
    if @outbuffer[destination].length > MAXBUF
      flushout(destination)
    end
  else
    MQRPC::logger.debug "Sending to #{destination}: #{msg.inspect}"
    @mq.queue(destination, :durable => true).publish([msg].to_json, :persistent => true)
  end

  if block_given?
    op = Operation.new(callback)
    @message_operations[msg.id] = op
    return op
  end
end

#sendmsg_topic(key, msg) ⇒ Object



285
286
287
288
289
290
291
292
293
# File 'lib/mqrpc/agent.rb', line 285

def sendmsg_topic(key, msg)
  if (msg.is_a?(RequestMessage) and msg.id == nil)
    msg.generate_id!
  end
  msg.timestamp = Time.now.to_f

  data = msg.to_json
  @mq.topic(@config.mqexchange).publish(data, :key => key)
end

#start_amqpObject

def initialize



114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
# File 'lib/mqrpc/agent.rb', line 114

def start_amqp
  @amqpthread = Thread.new do 
    # Create connection to AMQP, and in turn, the main EventMachine loop.
    amqp_config = {:host => @config.mqhost,
                   :port => @config.mqport,
                   :user => @config.mquser,
                   :pass => @config.mqpass,
                   :vhost => @config.mqvhost,
                  }
    AMQP.start(amqp_config) do
      @startup_mutex.synchronize do
        @mq = MQ.new
        # Notify the main calling thread (MessageSocket#initialize) that
        # we can continue
        @amqp_ready = true
        @startup_condvar.signal
      end

      MQRPC::logger.info "Subscribing to main queue #{@id}"
      subscribe(@id)
      
      # TODO(sissel): make this a deferred thread that reads from a Queue
      #EM.add_periodic_timer(5) { handle_new_subscriptions }
      EM.defer { handle_subscriptions }

      EM.add_periodic_timer(1) do
        # TODO(sissel): add locking
        @outbuffer.each_key { |dest| flushout(dest) }
        @outbuffer.clear
      end
    end # AMQP.start
  end
end

#start_receiverObject

def start_amqp



148
149
150
151
152
153
154
155
# File 'lib/mqrpc/agent.rb', line 148

def start_receiver
  Thread.new do 
    while true
      header, message = @receive_queue.pop
      handle_message(header, message)
    end
  end
end

#subscribe(name) ⇒ Object

def start_receiver



157
158
159
# File 'lib/mqrpc/agent.rb', line 157

def subscribe(name)
  @want_subscriptions << [:queue, name]
end

#subscribe_topic(name) ⇒ Object

def subscribe



161
162
163
# File 'lib/mqrpc/agent.rb', line 161

def subscribe_topic(name)
  @want_subscriptions << [:topic, name]
end