Module: AMQP::Client
- Includes:
- EM::Deferrable
- Defined in:
- lib/amqp/client.rb
Class Method Summary collapse
Instance Method Summary collapse
- #add_channel(mq) ⇒ Object
- #channels ⇒ Object
- #close(&on_disconnect) ⇒ Object
- #connected? ⇒ Boolean
- #connection_completed ⇒ Object
- #connection_status(&blk) ⇒ Object
- #init_heartbeat ⇒ Object
- #initialize(opts = {}) ⇒ Object
- #process_frame(frame) ⇒ Object
- #receive_data(data) ⇒ Object
- #reconnect(force = false) ⇒ Object
- #send(data, opts = {}) ⇒ Object
- #unbind ⇒ Object
Class Method Details
Instance Method Details
#add_channel(mq) ⇒ Object
122 123 124 125 126 127 |
# File 'lib/amqp/client.rb', line 122 def add_channel mq (@_channel_mutex ||= Mutex.new).synchronize do channels[ key = (channels.keys.max || 0) + 1 ] = mq key end end |
#channels ⇒ Object
129 130 131 |
# File 'lib/amqp/client.rb', line 129 def channels @channels ||= {} end |
#close(&on_disconnect) ⇒ Object
157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 |
# File 'lib/amqp/client.rb', line 157 def close &on_disconnect if on_disconnect @closing = true @on_disconnect = proc{ on_disconnect.call @closing = false } end callback{ |c| if c.channels.any? c.channels.each do |ch, mq| mq.close end else send Protocol::Connection::Close.new(:reply_code => 200, :reply_text => 'Goodbye', :class_id => 0, :method_id => 0) end } end |
#connected? ⇒ Boolean
112 113 114 |
# File 'lib/amqp/client.rb', line 112 def connected? @connected end |
#connection_completed ⇒ Object
76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 |
# File 'lib/amqp/client.rb', line 76 def connection_completed start_tls if @settings[:ssl] log 'connected' # @on_disconnect = proc{ raise Error, 'Disconnected from server' } unless @closing @on_disconnect = method(:disconnected) @reconnecting = false end @connected = true @connection_status.call(:connected) if @connection_status @buf = Buffer.new send_data HEADER send_data [1, 1, VERSION_MAJOR, VERSION_MINOR].pack('C4') if heartbeat = @settings[:heartbeat] init_heartbeat if (@settings[:heartbeat] = heartbeat.to_i) > 0 end end |
#connection_status(&blk) ⇒ Object
207 208 209 |
# File 'lib/amqp/client.rb', line 207 def connection_status &blk @connection_status = blk end |
#init_heartbeat ⇒ Object
97 98 99 100 101 102 103 104 105 106 107 108 109 110 |
# File 'lib/amqp/client.rb', line 97 def init_heartbeat @last_server_heartbeat = Time.now @timer ||= EM::PeriodicTimer.new(@settings[:heartbeat]) do if connected? if @last_server_heartbeat < (Time.now - (@settings[:heartbeat] * 2)) log "Reconnecting due to missing server heartbeats" reconnect(true) else send AMQP::Frame::Heartbeat.new end end end end |
#initialize(opts = {}) ⇒ Object
64 65 66 67 68 69 70 71 72 73 74 |
# File 'lib/amqp/client.rb', line 64 def initialize opts = {} @settings = opts extend AMQP.client @on_disconnect ||= proc{ raise Error, "Could not connect to server #{opts[:host]}:#{opts[:port]}" } timeout @settings[:timeout] if @settings[:timeout] errback{ @on_disconnect.call } unless @reconnecting @connected = false end |
#process_frame(frame) ⇒ Object
143 144 145 146 |
# File 'lib/amqp/client.rb', line 143 def process_frame frame # this is a stub meant to be # replaced by the module passed into initialize end |
#receive_data(data) ⇒ Object
133 134 135 136 137 138 139 140 141 |
# File 'lib/amqp/client.rb', line 133 def receive_data data # log 'receive_data', data @buf << data while frame = Frame.parse(@buf) log 'receive', frame process_frame frame end end |
#reconnect(force = false) ⇒ Object
180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 |
# File 'lib/amqp/client.rb', line 180 def reconnect force = false if @reconnecting and not force # wait 1 second after first reconnect attempt, in between each subsequent attempt EM.add_timer(1){ reconnect(true) } return end unless @reconnecting @reconnecting = true @deferred_status = nil initialize(@settings) mqs = @channels @channels = {} mqs.each{ |_,mq| mq.reset } if mqs end log 'reconnecting' EM.reconnect @settings[:host], @settings[:port], self end |
#send(data, opts = {}) ⇒ Object
148 149 150 151 152 153 154 155 |
# File 'lib/amqp/client.rb', line 148 def send data, opts = {} channel = opts[:channel] ||= 0 data = data.to_frame(channel) unless data.is_a? Frame data.channel = channel log 'send', data send_data data.to_s end |
#unbind ⇒ Object
116 117 118 119 120 |
# File 'lib/amqp/client.rb', line 116 def unbind log 'disconnected' @connected = false EM.next_tick{ @on_disconnect.call } end |