Class: MQ

Inherits:
Object
  • Object
show all
Includes:
AMQP, EM::Deferrable
Defined in:
lib/mq.rb,
lib/mq.rb,
lib/mq.rb,
lib/mq.rb,
lib/mq/rpc.rb,
lib/mq/queue.rb,
lib/mq/exchange.rb

Overview

unique identifier

Defined Under Namespace

Classes: Error, Exchange, Queue, RPC

Constant Summary

Constants included from AMQP

AMQP::DIR, AMQP::FIELDS, AMQP::HEADER, AMQP::PORT, AMQP::RESPONSES, AMQP::VERSION, AMQP::VERSION_MAJOR, AMQP::VERSION_MINOR

Class Attribute Summary collapse

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from AMQP

client, client=, connect, settings, start, stop

Constructor Details

#initialize(connection = nil) ⇒ MQ

Returns a new instance of MQ.



21
22
23
24
25
26
27
28
29
30
# File 'lib/mq.rb', line 21

def initialize connection = nil
  raise 'MQ can only be used from within EM.run{}' unless EM.reactor_running?

  @connection = connection || AMQP.start

  conn.callback{ |c|
    @channel = c.add_channel(self)
    send Protocol::Channel::Open.new
  }
end

Class Attribute Details

.loggingObject

Returns the value of attribute logging.



11
12
13
# File 'lib/mq.rb', line 11

def logging
  @logging
end

Instance Attribute Details

#channelObject (readonly)

Returns the value of attribute channel.



31
32
33
# File 'lib/mq.rb', line 31

def channel
  @channel
end

Class Method Details

.defaultObject



151
152
153
# File 'lib/mq.rb', line 151

def MQ.default
  Thread.current[:mq] ||= MQ.new
end

.idObject



162
163
164
# File 'lib/mq.rb', line 162

def MQ.id
  Thread.current[:mq_id] ||= "#{`hostname`.strip}-#{Process.pid}-#{Thread.current.object_id}"
end

.method_missing(meth, *args, &blk) ⇒ Object



155
156
157
# File 'lib/mq.rb', line 155

def MQ.method_missing meth, *args, &blk
  MQ.default.__send__(meth, *args, &blk)
end

Instance Method Details

#closeObject



111
112
113
114
115
116
117
118
119
120
# File 'lib/mq.rb', line 111

def close
  if @deferred_status == :succeeded
    send Protocol::Channel::Close.new(:reply_code => 200,
                                      :reply_text => 'bye',
                                      :method_id => 0,
                                      :class_id => 0)
  else
    @closing = true
  end
end

#exchangesObject

keep track of proxy objects



124
125
126
# File 'lib/mq.rb', line 124

def exchanges
  @exchanges ||= {}
end

#process_frame(frame) ⇒ Object



33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
# File 'lib/mq.rb', line 33

def process_frame frame
  log :received, frame

  case frame
  when Frame::Header
    @header = frame.payload
    @body = ''

  when Frame::Body
    @body << frame.payload
    if @body.length >= @header.size
      @header.properties.update(@method.arguments)
      @consumer.receive @header, @body
      @body = ''
    end

  when Frame::Method
    case method = frame.payload
    when Protocol::Channel::OpenOk
      send Protocol::Access::Request.new(:realm => '/data',
                                         :read => true,
                                         :write => true,
                                         :active => true)

    when Protocol::Access::RequestOk
      @ticket = method.ticket
      callback{
        send Protocol::Channel::Close.new(:reply_code => 200,
                                          :reply_text => 'bye',
                                          :method_id => 0,
                                          :class_id => 0)
      } if @closing
      succeed

    when Protocol::Basic::Deliver
      @method = method
      @header = nil
      @body = ''
      @consumer = queues[ method.consumer_tag ]


    when Protocol::Channel::Close
      raise Error, "#{method.reply_text} in #{Protocol.classes[method.class_id].methods[method.method_id]}"

    when Protocol::Channel::CloseOk
      @closing = false
      conn.callback{ |c|
        c.channels.delete(@channel)
        c.close unless c.channels.keys.any?
      }
    end
  end
end

#queue(name, opts = {}) ⇒ Object



103
104
105
# File 'lib/mq.rb', line 103

def queue name, opts = {}
  queues[name] ||= Queue.new(self, name, opts)
end

#queuesObject



128
129
130
# File 'lib/mq.rb', line 128

def queues
  @queues ||= {}
end

#rpc(name, obj = nil) ⇒ Object



107
108
109
# File 'lib/mq.rb', line 107

def rpc name, obj = nil
  rpcs[name] ||= RPC.new(self, name, obj)
end

#rpcsObject



132
133
134
# File 'lib/mq.rb', line 132

def rpcs
  @rcps ||= {}
end

#send(data) ⇒ Object



87
88
89
90
91
92
93
# File 'lib/mq.rb', line 87

def send data
  data.ticket = @ticket if @ticket and data.respond_to? :ticket
  conn.callback{ |c|
    log :sending, data
    c.send data, :channel => @channel
  }
end