Module: AMQP::BasicClient

Defined in:
lib/right_amqp/amqp/client.rb

Instance Method Summary collapse

Instance Method Details

#process_frame(frame) ⇒ Object



8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# File 'lib/right_amqp/amqp/client.rb', line 8

def process_frame frame
  @last_data_received = Time.now
  if mq = channels[frame.channel]
    mq.process_frame(frame)
    return
  end

  case frame
  when Frame::Method
    case method = frame.payload
    when Protocol::Connection::Start
      send Protocol::Connection::StartOk.new({:platform => 'Ruby/EventMachine',
                                              :product => 'AMQP',
                                              :information => 'http://github.com/tmm1/amqp',
                                              :version => VERSION},
                                             'AMQPLAIN',
                                             {:LOGIN => @settings[:user],
                                              :PASSWORD => @settings[:pass]},
                                             'en_US')

    when Protocol::Connection::Tune
      # Use frame_max received from broker so that adapt properly to RabbitMQ 2.4.1
      # expecting 131,072 and RabbitMQ 3.4.1 that is configured to 0 meaning unlimited
      # because it will otherwise reject requests larger than this and fail the connection
      @frame_max = method.frame_max
      send Protocol::Connection::TuneOk.new(:channel_max => 0,
                                            :frame_max => method.frame_max,
                                            :heartbeat => @settings[:heartbeat] || 0)

      send Protocol::Connection::Open.new(:virtual_host => @settings[:vhost],
                                          :capabilities => '',
                                          :insist => @settings[:insist])

    when Protocol::Connection::OpenOk
      logger.debug("[amqp] Received open completion from broker #{@settings[:identity]}")
      succeed(self)

    when Protocol::Connection::Close
      # raise Error, "#{method.reply_text} in #{Protocol.classes[method.class_id].methods[method.method_id]}"
      STDERR.puts "#{method.reply_text} in #{Protocol.classes[method.class_id].methods[method.method_id]}"

    when Protocol::Connection::CloseOk
      logger.debug("[amqp] Received close completion from broker #{@settings[:identity]}")
      @on_disconnect.call if @on_disconnect
    end

  when Frame::Heartbeat
    logger.debug("[amqp] Received heartbeat from broker #{@settings[:identity]}")
    @last_heartbeat_received = @last_data_received
  end

  # Make callback now that handshake with the broker has completed
  # The 'connected' status callback happens before the handshake is done and if it results in
  # a lot of activity it might prevent EM from being able to call the code handling the
  # incoming handshake packet in a timely fashion causing the broker to close the connection
  @connection_status.call(:ready) if @connection_status && frame.payload.is_a?(AMQP::Protocol::Connection::OpenOk)
end