Module: AMQP::Client

Includes:
EM::Deferrable
Defined in:
lib/amqp/client.rb

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.connect(opts = {}) ⇒ Object



131
132
133
134
135
136
137
# File 'lib/amqp/client.rb', line 131

def self.connect opts = {}
  opts = AMQP.settings.merge(opts)
  opts[:host] ||= '127.0.0.1'
  opts[:port] ||= PORT

  EM.connect opts[:host], opts[:port], self, opts
end

Instance Method Details

#add_channel(mq) ⇒ Object



72
73
74
75
# File 'lib/amqp/client.rb', line 72

def add_channel mq
  channels[ key = (channels.keys.max || 0) + 1 ] = mq
  key
end

#channels(mq = nil) ⇒ Object



77
78
79
# File 'lib/amqp/client.rb', line 77

def channels mq = nil
  @channels ||= {}
end

#close(&on_disconnect) ⇒ Object

def send_data data

log 'send_data', data
super

end



110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
# File 'lib/amqp/client.rb', line 110

def close &on_disconnect
  @on_disconnect = on_disconnect if on_disconnect

  callback{ |c|
    if c.channels.keys.any?
      c.channels.each do |_, mq|
        mq.close
      end
    else
      send Protocol::Connection::Close.new(:reply_code => 200,
                                           :reply_text => 'Goodbye',
                                           :class_id => 0,
                                           :method_id => 0)
    end
  }
end

#connection_completedObject



65
66
67
68
69
70
# File 'lib/amqp/client.rb', line 65

def connection_completed
  log 'connected'
  @buf = Buffer.new
  send_data HEADER
  send_data [1, 1, VERSION_MAJOR, VERSION_MINOR].pack('C4')
end

#initialize(opts = {}) ⇒ Object



60
61
62
63
# File 'lib/amqp/client.rb', line 60

def initialize opts = {}
  @settings = opts
  extend AMQP.client
end

#process_frame(frame) ⇒ Object



91
92
93
94
# File 'lib/amqp/client.rb', line 91

def process_frame frame
  # this is a stub meant to be
  # replaced by the module passed into initialize
end

#receive_data(data) ⇒ Object



81
82
83
84
85
86
87
88
89
# File 'lib/amqp/client.rb', line 81

def receive_data data
  # log 'receive_data', data
  @buf << data

  while frame = Frame.parse(@buf)
    log 'receive', frame
    process_frame frame
  end
end

#send(data, opts = {}) ⇒ Object



96
97
98
99
100
101
102
103
# File 'lib/amqp/client.rb', line 96

def send data, opts = {}
  channel = opts[:channel] ||= 0
  data = data.to_frame(channel) unless data.is_a? Frame
  data.channel = channel

  log 'send', data
  send_data data.to_s
end

#unbindObject



127
128
129
# File 'lib/amqp/client.rb', line 127

def unbind
  log 'disconnected'
end