8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
|
# File 'lib/right_amqp/amqp/client.rb', line 8
def process_frame frame
@last_data_received = Time.now
if mq = channels[frame.channel]
mq.process_frame(frame)
return
end
case frame
when Frame::Method
case method = frame.payload
when Protocol::Connection::Start
send Protocol::Connection::StartOk.new({:platform => 'Ruby/EventMachine',
:product => 'AMQP',
:information => 'http://github.com/tmm1/amqp',
:version => VERSION},
'AMQPLAIN',
{:LOGIN => @settings[:user],
:PASSWORD => @settings[:pass]},
'en_US')
when Protocol::Connection::Tune
@frame_max = method.frame_max
send Protocol::Connection::TuneOk.new(:channel_max => 0,
:frame_max => method.frame_max,
:heartbeat => @settings[:heartbeat] || 0)
send Protocol::Connection::Open.new(:virtual_host => @settings[:vhost],
:capabilities => '',
:insist => @settings[:insist])
when Protocol::Connection::OpenOk
logger.debug("[amqp] Received open completion from broker #{@settings[:identity]}")
succeed(self)
when Protocol::Connection::Close
STDERR.puts "#{method.reply_text} in #{Protocol.classes[method.class_id].methods[method.method_id]}"
when Protocol::Connection::CloseOk
logger.debug("[amqp] Received close completion from broker #{@settings[:identity]}")
@on_disconnect.call if @on_disconnect
end
when Frame::Heartbeat
logger.debug("[amqp] Received heartbeat from broker #{@settings[:identity]}")
@last_heartbeat_received = @last_data_received
end
@connection_status.call(:ready) if @connection_status && frame.payload.is_a?(AMQP::Protocol::Connection::OpenOk)
end
|