Module: AMQP::Client

Includes:
EM::Deferrable
Defined in:
lib/amqp/client.rb

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.connect(opts = {}) ⇒ Object



202
203
204
205
# File 'lib/amqp/client.rb', line 202

def self.connect opts = {}
  opts = AMQP.settings.merge(opts)
  EM.connect opts[:host], opts[:port], self, opts
end

Instance Method Details

#add_channel(mq) ⇒ Object



122
123
124
125
126
127
# File 'lib/amqp/client.rb', line 122

def add_channel mq
  (@_channel_mutex ||= Mutex.new).synchronize do
    channels[ key = (channels.keys.max || 0) + 1 ] = mq
    key
  end
end

#channelsObject



129
130
131
# File 'lib/amqp/client.rb', line 129

def channels
  @channels ||= {}
end

#close(&on_disconnect) ⇒ Object



157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
# File 'lib/amqp/client.rb', line 157

def close &on_disconnect
  if on_disconnect
    @closing = true
    @on_disconnect = proc{
      on_disconnect.call
      @closing = false
    }
  end

  callback{ |c|
    if c.channels.any?
      c.channels.each do |ch, mq|
        mq.close
      end
    else
      send Protocol::Connection::Close.new(:reply_code => 200,
                                           :reply_text => 'Goodbye',
                                           :class_id => 0,
                                           :method_id => 0)
    end
  }
end

#connected?Boolean

Returns:

  • (Boolean)


112
113
114
# File 'lib/amqp/client.rb', line 112

def connected?
  @connected
end

#connection_completedObject



76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
# File 'lib/amqp/client.rb', line 76

def connection_completed
  start_tls if @settings[:ssl]
  log 'connected'
  # @on_disconnect = proc{ raise Error, 'Disconnected from server' }
  unless @closing
    @on_disconnect = method(:disconnected)
    @reconnecting = false
  end

  @connected = true
  @connection_status.call(:connected) if @connection_status

  @buf = Buffer.new
  send_data HEADER
  send_data [1, 1, VERSION_MAJOR, VERSION_MINOR].pack('C4')
  
  if heartbeat = @settings[:heartbeat]
    init_heartbeat if (@settings[:heartbeat] = heartbeat.to_i) > 0
  end
end

#connection_status(&blk) ⇒ Object



207
208
209
# File 'lib/amqp/client.rb', line 207

def connection_status &blk
  @connection_status = blk
end

#init_heartbeatObject



97
98
99
100
101
102
103
104
105
106
107
108
109
110
# File 'lib/amqp/client.rb', line 97

def init_heartbeat
  @last_server_heartbeat = Time.now

  @timer ||= EM::PeriodicTimer.new(@settings[:heartbeat]) do
    if connected?
      if @last_server_heartbeat < (Time.now - (@settings[:heartbeat] * 2))
        log "Reconnecting due to missing server heartbeats"
        reconnect(true)
      else
        send AMQP::Frame::Heartbeat.new
      end
    end
  end
end

#initialize(opts = {}) ⇒ Object



64
65
66
67
68
69
70
71
72
73
74
# File 'lib/amqp/client.rb', line 64

def initialize opts = {}
  @settings = opts
  extend AMQP.client

  @on_disconnect ||= proc{ raise Error, "Could not connect to server #{opts[:host]}:#{opts[:port]}" }

  timeout @settings[:timeout] if @settings[:timeout]
  errback{ @on_disconnect.call } unless @reconnecting

  @connected = false
end

#process_frame(frame) ⇒ Object



143
144
145
146
# File 'lib/amqp/client.rb', line 143

def process_frame frame
  # this is a stub meant to be
  # replaced by the module passed into initialize
end

#receive_data(data) ⇒ Object



133
134
135
136
137
138
139
140
141
# File 'lib/amqp/client.rb', line 133

def receive_data data
  # log 'receive_data', data
  @buf << data

  while frame = Frame.parse(@buf)
    log 'receive', frame
    process_frame frame
  end
end

#reconnect(force = false) ⇒ Object



180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
# File 'lib/amqp/client.rb', line 180

def reconnect force = false
  if @reconnecting and not force
    # wait 1 second after first reconnect attempt, in between each subsequent attempt
    EM.add_timer(1){ reconnect(true) }
    return
  end

  unless @reconnecting
    @reconnecting = true

    @deferred_status = nil
    initialize(@settings)

    mqs = @channels
    @channels = {}
    mqs.each{ |_,mq| mq.reset } if mqs
  end

  log 'reconnecting'
  EM.reconnect @settings[:host], @settings[:port], self
end

#send(data, opts = {}) ⇒ Object



148
149
150
151
152
153
154
155
# File 'lib/amqp/client.rb', line 148

def send data, opts = {}
  channel = opts[:channel] ||= 0
  data = data.to_frame(channel) unless data.is_a? Frame
  data.channel = channel

  log 'send', data
  send_data data.to_s
end

#unbindObject



116
117
118
119
120
# File 'lib/amqp/client.rb', line 116

def unbind
  log 'disconnected'
  @connected = false
  EM.next_tick{ @on_disconnect.call }
end