Module: NATS
- Defined in:
- lib/nats/version.rb,
lib/nats/nuid.rb,
lib/nats/client.rb
Overview
Copyright 2010-2018 The NATS Authors Licensed under the Apache License, Version 2.0 (the “License”); you may not use this file except in compliance with the License. You may obtain a copy of the License at
www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an “AS IS” BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Defined Under Namespace
Classes: AuthError, ClientError, ConnectError, Error, MonotonicTime, NUID, ServerError
Constant Summary collapse
- DEFAULT_PORT =
4222- DEFAULT_URI =
"nats://localhost:#{DEFAULT_PORT}".freeze
- MAX_RECONNECT_ATTEMPTS =
10- RECONNECT_TIME_WAIT =
2- MAX_PENDING_SIZE =
32768- FAST_PRODUCER_THRESHOLD =
Maximum outbound size per client to trigger FP, 20MB
(10*1024*1024)
- DEFAULT_PING_INTERVAL =
Ping intervals
120- DEFAULT_PING_MAX =
2- MSG =
Protocol
/\AMSG\s+([^\s]+)\s+([^\s]+)\s+(([^\s]+)[^\S\r\n]+)?(\d+)\r\n/i- OK =
:nodoc:
/\A\+OK\s*\r\n/i- ERR =
:nodoc:
/\A-ERR\s+('.+')?\r\n/i- PING =
:nodoc:
/\APING\s*\r\n/i- PONG =
:nodoc:
/\APONG\s*\r\n/i- INFO =
:nodoc:
/\AINFO\s+([^\r\n]+)\r\n/i- UNKNOWN =
:nodoc:
/\A(.*)\r\n/- CR_LF =
Responses
("\r\n".freeze)
- CR_LF_SIZE =
:nodoc:
(CR_LF.bytesize)
- PING_REQUEST =
:nodoc:
("PING#{CR_LF}".freeze)
- PONG_RESPONSE =
:nodoc:
("PONG#{CR_LF}".freeze)
- SUB_OP =
:nodoc:
('SUB'.freeze)
- EMPTY_MSG =
:nodoc:
(''.freeze)
- SUB =
Used for future pedantic Mode
/^([^\.\*>\s]+|>$|\*)(\.([^\.\*>\s]+|>$|\*))*$/- SUB_NO_WC =
:nodoc:
/^([^\.\*>\s]+)(\.([^\.\*>\s]+))*$/- AWAITING_CONTROL_LINE =
Parser
1- AWAITING_MSG_PAYLOAD =
:nodoc:
2- VERSION =
NOTE: These are all announced to the server on CONNECT
"0.9.0".freeze
- LANG =
RUBY_ENGINE- PROTOCOL_VERSION =
1
Class Attribute Summary collapse
-
.client ⇒ Object
readonly
:nodoc:.
-
.close_cb ⇒ Object
readonly
:nodoc.
-
.disconnect_cb ⇒ Object
readonly
:nodoc.
-
.err_cb ⇒ Object
readonly
:nodoc:.
-
.err_cb_overridden ⇒ Object
readonly
:nodoc:.
-
.reactor_was_running ⇒ Object
(also: reactor_was_running?)
readonly
:nodoc:.
-
.reconnect_cb ⇒ Object
readonly
:nodoc.
Instance Attribute Summary collapse
-
#bytes_received ⇒ Object
readonly
Returns the value of attribute bytes_received.
-
#bytes_sent ⇒ Object
readonly
Returns the value of attribute bytes_sent.
-
#close_cb ⇒ Object
readonly
Returns the value of attribute close_cb.
-
#closing ⇒ Object
(also: #closing?)
readonly
:nodoc.
-
#connect_cb ⇒ Object
readonly
:nodoc:.
-
#connected ⇒ Object
(also: #connected?)
readonly
:nodoc:.
-
#disconnect_cb ⇒ Object
readonly
Returns the value of attribute disconnect_cb.
-
#err_cb ⇒ Object
readonly
:nodoc:.
-
#err_cb_overridden ⇒ Object
readonly
:nodoc:.
-
#msgs_received ⇒ Object
readonly
Returns the value of attribute msgs_received.
-
#msgs_sent ⇒ Object
readonly
Returns the value of attribute msgs_sent.
-
#options ⇒ Object
readonly
:nodoc.
-
#pings ⇒ Object
readonly
Returns the value of attribute pings.
-
#pongs_received ⇒ Object
readonly
:nodoc:.
-
#reconnecting ⇒ Object
(also: #reconnecting?)
readonly
:nodoc.
-
#server_info ⇒ Object
readonly
:nodoc.
-
#server_pool ⇒ Object
readonly
:nodoc.
Class Method Summary collapse
-
.clear_client ⇒ Object
:nodoc:.
-
.connect(opts = {}, &blk) ⇒ NATS
Create and return a connection to the server with the given options.
-
.connected? ⇒ Boolean
Connected state.
-
.connected_server ⇒ URI
Connected server.
-
.create_inbox ⇒ String
Returns a subject that can be used for “directed” communications.
-
.flush(*args, &blk) ⇒ Object
Flushes all messages and subscriptions in the default connection.
-
.on_close(&callback) ⇒ Object
Set the default on_closed callback.
-
.on_disconnect(&callback) ⇒ Object
Set the default on_disconnect callback.
-
.on_error(&callback) ⇒ Object
Set the default on_error callback.
-
.on_reconnect(&callback) ⇒ Object
Set the default on_reconnect callback.
-
.options ⇒ Hash
Options.
-
.pending_data_size(*args) ⇒ Object
Return bytes outstanding for the default client connection.
-
.publish(*args, &blk) ⇒ Object
Publish a message using the default client connection.
-
.reconnecting? ⇒ Boolean
Reconnecting state.
-
.request(*args, &blk) ⇒ Object
Publish a message and wait for a response on the default client connection.
-
.server_info ⇒ Hash
Server information.
-
.server_running?(uri) ⇒ Boolean
:nodoc:.
-
.start(*args, &blk) ⇒ Object
Create a default client connection to the server.
-
.stop(&blk) ⇒ Object
Close the default client connection and optionally call the associated block.
-
.subscribe(*args, &blk) ⇒ Object
Subscribe using the default client connection.
-
.timeout(*args, &blk) ⇒ Object
Set a timeout for receiving messages for the subscription.
-
.unsubscribe(*args) ⇒ Object
Cancel a subscription on the default client connection.
-
.wait_for_server(uri, max_wait = 5) ⇒ Object
:nodoc:.
Instance Method Summary collapse
-
#attempt_reconnect ⇒ Object
:nodoc:.
- #auth_connection? ⇒ Boolean
-
#bind_primary ⇒ Object
:nodoc:.
-
#can_reuse_server?(server) ⇒ Boolean
:nodoc:.
- #cancel_ping_timer ⇒ Object
- #cancel_reconnect_timer ⇒ Object
- #client_using_secure_connection? ⇒ Boolean
-
#close ⇒ Object
Close the connection to the server.
-
#connect_command ⇒ Object
:nodoc:.
-
#connected_server ⇒ URI
Connected server.
-
#connection_completed ⇒ Object
:nodoc:.
- #disconnect_error_string ⇒ Object
-
#discovered_servers ⇒ Object
Retrieves the list of servers which have been discovered via server connect_urls announcements.
-
#flush(&blk) ⇒ Object
Flushes all messages and subscriptions for the connection.
-
#flush_pending ⇒ Object
:nodoc:.
- #had_error? ⇒ Boolean
- #initialize(options) ⇒ Object
-
#inspect ⇒ Object
:nodoc:.
- #multiple_servers_available? ⇒ Boolean
-
#on_close(&callback) ⇒ Object
Define a callback to be called when client is disconnected from server.
-
#on_connect(&callback) ⇒ Object
Define a callback to be called when the client connection has been established.
-
#on_disconnect(&callback) ⇒ Object
Define a callback to be called when client is disconnected from server.
-
#on_error(&callback) ⇒ Object
Define a callback to be called when errors occur on the client connection.
-
#on_msg(subject, sid, reply, msg) ⇒ Object
:nodoc:.
-
#on_reconnect(&callback) ⇒ Object
Define a callback to be called when a reconnect attempt is made.
-
#pending_data_size ⇒ Object
Return bytes outstanding waiting to be sent to server.
-
#process_connect ⇒ Object
:nodoc:.
-
#process_disconnect ⇒ Object
:nodoc:.
-
#process_info(info) ⇒ Object
:nodoc:.
- #process_pong ⇒ Object
-
#process_uri_options ⇒ Object
Parse out URIs which can now be an array of server choices The server pool will contain both explicit and implicit members.
-
#publish(subject, msg = EMPTY_MSG, opt_reply = nil, &blk) ⇒ Object
Publish a message to a given subject, with optional reply subject and completion block.
-
#queue_server_rt(&cb) ⇒ Object
:nodoc:.
-
#receive_data(data) ⇒ Object
:nodoc:.
-
#request(subject, data = nil, opts = {}, &cb) ⇒ Object
Send a request and have the response delivered to the supplied callback.
-
#schedule_primary_and_connect ⇒ Object
We have failed on an attempt at the primary (first) server, rotate and try again.
-
#schedule_reconnect ⇒ Object
:nodoc:.
-
#send_command(command, priority = false) ⇒ Object
:nodoc:.
-
#send_connect_command ⇒ Object
:nodoc:.
-
#send_ping ⇒ Object
:nodoc:.
- #server_using_secure_connection? ⇒ Boolean
- #should_delay_connect?(server) ⇒ Boolean
- #should_not_reconnect? ⇒ Boolean
- #ssl_handshake_completed ⇒ Object
- #ssl_verify_peer(cert) ⇒ Object
- #start_resp_mux_sub! ⇒ Object
-
#subscribe(subject, opts = {}, &callback) ⇒ Object
Subscribe to a subject with optional wildcards.
-
#subscription_count ⇒ Number
Return the active subscription count.
-
#timeout(sid, timeout, opts = {}, &callback) ⇒ Object
Setup a timeout for receiving messages for the subscription.
-
#unbind ⇒ Object
:nodoc:.
-
#unsubscribe(sid, opt_max = nil) ⇒ Object
Cancel a subscription.
-
#user_err_cb? ⇒ Boolean
:nodoc:.
Class Attribute Details
.client ⇒ Object (readonly)
:nodoc:
88 89 90 |
# File 'lib/nats/client.rb', line 88 def client @client end |
.close_cb ⇒ Object (readonly)
:nodoc
89 90 91 |
# File 'lib/nats/client.rb', line 89 def close_cb @close_cb end |
.disconnect_cb ⇒ Object (readonly)
:nodoc
89 90 91 |
# File 'lib/nats/client.rb', line 89 def disconnect_cb @disconnect_cb end |
.err_cb ⇒ Object (readonly)
:nodoc:
88 89 90 |
# File 'lib/nats/client.rb', line 88 def err_cb @err_cb end |
.err_cb_overridden ⇒ Object (readonly)
:nodoc:
88 89 90 |
# File 'lib/nats/client.rb', line 88 def err_cb_overridden @err_cb_overridden end |
.reactor_was_running ⇒ Object (readonly) Also known as: reactor_was_running?
:nodoc:
88 89 90 |
# File 'lib/nats/client.rb', line 88 def reactor_was_running @reactor_was_running end |
.reconnect_cb ⇒ Object (readonly)
:nodoc
89 90 91 |
# File 'lib/nats/client.rb', line 89 def reconnect_cb @reconnect_cb end |
Instance Attribute Details
#bytes_received ⇒ Object (readonly)
Returns the value of attribute bytes_received.
350 351 352 |
# File 'lib/nats/client.rb', line 350 def bytes_received @bytes_received end |
#bytes_sent ⇒ Object (readonly)
Returns the value of attribute bytes_sent.
350 351 352 |
# File 'lib/nats/client.rb', line 350 def bytes_sent @bytes_sent end |
#close_cb ⇒ Object (readonly)
Returns the value of attribute close_cb.
351 352 353 |
# File 'lib/nats/client.rb', line 351 def close_cb @close_cb end |
#closing ⇒ Object (readonly) Also known as: closing?
:nodoc
349 350 351 |
# File 'lib/nats/client.rb', line 349 def closing @closing end |
#connect_cb ⇒ Object (readonly)
:nodoc:
348 349 350 |
# File 'lib/nats/client.rb', line 348 def connect_cb @connect_cb end |
#connected ⇒ Object (readonly) Also known as: connected?
:nodoc:
348 349 350 |
# File 'lib/nats/client.rb', line 348 def connected @connected end |
#disconnect_cb ⇒ Object (readonly)
Returns the value of attribute disconnect_cb.
351 352 353 |
# File 'lib/nats/client.rb', line 351 def disconnect_cb @disconnect_cb end |
#err_cb ⇒ Object (readonly)
:nodoc:
348 349 350 |
# File 'lib/nats/client.rb', line 348 def err_cb @err_cb end |
#err_cb_overridden ⇒ Object (readonly)
:nodoc:
348 349 350 |
# File 'lib/nats/client.rb', line 348 def err_cb_overridden @err_cb_overridden end |
#msgs_received ⇒ Object (readonly)
Returns the value of attribute msgs_received.
350 351 352 |
# File 'lib/nats/client.rb', line 350 def msgs_received @msgs_received end |
#msgs_sent ⇒ Object (readonly)
Returns the value of attribute msgs_sent.
350 351 352 |
# File 'lib/nats/client.rb', line 350 def msgs_sent @msgs_sent end |
#options ⇒ Object (readonly)
:nodoc
349 350 351 |
# File 'lib/nats/client.rb', line 349 def @options end |
#pings ⇒ Object (readonly)
Returns the value of attribute pings.
350 351 352 |
# File 'lib/nats/client.rb', line 350 def pings @pings end |
#pongs_received ⇒ Object (readonly)
:nodoc:
348 349 350 |
# File 'lib/nats/client.rb', line 348 def pongs_received @pongs_received end |
#reconnecting ⇒ Object (readonly) Also known as: reconnecting?
:nodoc
349 350 351 |
# File 'lib/nats/client.rb', line 349 def reconnecting @reconnecting end |
#server_info ⇒ Object (readonly)
:nodoc
349 350 351 |
# File 'lib/nats/client.rb', line 349 def server_info @server_info end |
#server_pool ⇒ Object (readonly)
:nodoc
349 350 351 |
# File 'lib/nats/client.rb', line 349 def server_pool @server_pool end |
Class Method Details
.clear_client ⇒ Object
:nodoc:
336 337 338 |
# File 'lib/nats/client.rb', line 336 def clear_client # :nodoc: @client = nil end |
.connect(opts = {}, &blk) ⇒ NATS
Create and return a connection to the server with the given options. The optional block will be called when the connection has been completed.
110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 |
# File 'lib/nats/client.rb', line 110 def connect(opts={}, &blk) # Defaults opts[:verbose] = false if opts[:verbose].nil? opts[:pedantic] = false if opts[:pedantic].nil? opts[:reconnect] = true if opts[:reconnect].nil? opts[:ssl] = false if opts[:ssl].nil? opts[:max_reconnect_attempts] = MAX_RECONNECT_ATTEMPTS if opts[:max_reconnect_attempts].nil? opts[:reconnect_time_wait] = RECONNECT_TIME_WAIT if opts[:reconnect_time_wait].nil? opts[:ping_interval] = DEFAULT_PING_INTERVAL if opts[:ping_interval].nil? opts[:max_outstanding_pings] = DEFAULT_PING_MAX if opts[:max_outstanding_pings].nil? # Override with ENV opts[:uri] ||= ENV['NATS_URI'] || DEFAULT_URI opts[:verbose] = ENV['NATS_VERBOSE'].downcase == 'true' unless ENV['NATS_VERBOSE'].nil? opts[:pedantic] = ENV['NATS_PEDANTIC'].downcase == 'true' unless ENV['NATS_PEDANTIC'].nil? opts[:debug] = ENV['NATS_DEBUG'].downcase == 'true' unless ENV['NATS_DEBUG'].nil? opts[:reconnect] = ENV['NATS_RECONNECT'].downcase == 'true' unless ENV['NATS_RECONNECT'].nil? opts[:fast_producer_error] = ENV['NATS_FAST_PRODUCER'].downcase == 'true' unless ENV['NATS_FAST_PRODUCER'].nil? opts[:ssl] = ENV['NATS_SSL'].downcase == 'true' unless ENV['NATS_SSL'].nil? opts[:max_reconnect_attempts] = ENV['NATS_MAX_RECONNECT_ATTEMPTS'].to_i unless ENV['NATS_MAX_RECONNECT_ATTEMPTS'].nil? opts[:reconnect_time_wait] = ENV['NATS_RECONNECT_TIME_WAIT'].to_i unless ENV['NATS_RECONNECT_TIME_WAIT'].nil? opts[:name] ||= ENV['NATS_CONNECTION_NAME'] opts[:ping_interval] = ENV['NATS_PING_INTERVAL'].to_i unless ENV['NATS_PING_INTERVAL'].nil? opts[:max_outstanding_pings] = ENV['NATS_MAX_OUTSTANDING_PINGS'].to_i unless ENV['NATS_MAX_OUTSTANDING_PINGS'].nil? uri = opts[:uris] || opts[:servers] || opts[:uri] if opts[:tls] case when opts[:tls][:ca_file] # Ensure that the file exists before going further # in order to report configuration errors during # connect synchronously. if !File.readable?(opts[:tls][:ca_file]) raise(Error, "TLS Verification is enabled but ca_file %s is not readable" % opts[:tls][:ca_file]) end # Certificate is supplied so assume we mean verification by default, # but still allow disabling explicitly by setting to false. opts[:tls][:verify_peer] ||= true when (opts[:tls][:verify_peer] && !opts[:tls][:ca_file]) raise(Error, "TLS Verification is enabled but ca_file is not set") else # Otherwise, disable verifying peer by default, # thus never reaching EM#ssl_verify_peer opts[:tls][:verify_peer] = false end # Allow overriding directly but default to those which server supports. opts[:tls][:ssl_version] ||= %w(tlsv1 tlsv1_1 tlsv1_2) opts[:tls][:protocols] ||= %w(tlsv1 tlsv1_1 tlsv1_2) end # If they pass an array here just pass along to the real connection, and use first as the first attempt.. # Real connection will do proper walk throughs etc.. unless uri.nil? uris = uri.kind_of?(Array) ? uri : [uri] uris.shuffle! unless opts[:dont_randomize_servers] u = uris.first @uri = u.is_a?(URI) ? u.dup : URI.parse(u) end @err_cb = proc { |e| raise e } unless err_cb @close_cb = proc { } unless close_cb @disconnect_cb = proc { } unless disconnect_cb client = EM.connect(@uri.host, @uri.port, self, opts) client.on_connect(&blk) if blk return client end |
.connected? ⇒ Boolean
Returns Connected state.
221 222 223 224 |
# File 'lib/nats/client.rb', line 221 def connected? return false unless client client.connected? end |
.connected_server ⇒ URI
Returns Connected server.
215 216 217 218 |
# File 'lib/nats/client.rb', line 215 def connected_server return nil unless client client.connected_server end |
.create_inbox ⇒ String
Returns a subject that can be used for “directed” communications.
303 304 305 |
# File 'lib/nats/client.rb', line 303 def create_inbox "_INBOX.#{SecureRandom.hex(13)}" end |
.flush(*args, &blk) ⇒ Object
Flushes all messages and subscriptions in the default connection
309 310 311 |
# File 'lib/nats/client.rb', line 309 def flush(*args, &blk) (@client ||= connect).flush(*args, &blk) end |
.on_close(&callback) ⇒ Object
Set the default on_closed callback.
266 267 268 269 |
# File 'lib/nats/client.rb', line 266 def on_close(&callback) @close_cb = callback @client.on_close(&callback) unless @client.nil? end |
.on_disconnect(&callback) ⇒ Object
Set the default on_disconnect callback.
259 260 261 262 |
# File 'lib/nats/client.rb', line 259 def on_disconnect(&callback) @disconnect_cb = callback @client.on_disconnect(&callback) unless @client.nil? end |
.on_error(&callback) ⇒ Object
Set the default on_error callback.
246 247 248 |
# File 'lib/nats/client.rb', line 246 def on_error(&callback) @err_cb, @err_cb_overridden = callback, true end |
.on_reconnect(&callback) ⇒ Object
Set the default on_reconnect callback.
252 253 254 255 |
# File 'lib/nats/client.rb', line 252 def on_reconnect(&callback) @reconnect_cb = callback @client.on_reconnect(&callback) unless @client.nil? end |
.options ⇒ Hash
Returns Options.
233 234 235 236 |
# File 'lib/nats/client.rb', line 233 def return {} unless client client. end |
.pending_data_size(*args) ⇒ Object
Return bytes outstanding for the default client connection.
315 316 317 |
# File 'lib/nats/client.rb', line 315 def pending_data_size(*args) (@client ||= connect).pending_data_size(*args) end |
.publish(*args, &blk) ⇒ Object
Publish a message using the default client connection.
273 274 275 |
# File 'lib/nats/client.rb', line 273 def publish(*args, &blk) (@client ||= connect).publish(*args, &blk) end |
.reconnecting? ⇒ Boolean
Returns Reconnecting state.
227 228 229 230 |
# File 'lib/nats/client.rb', line 227 def reconnecting? return false unless client client.reconnecting? end |
.request(*args, &blk) ⇒ Object
Publish a message and wait for a response on the default client connection.
297 298 299 |
# File 'lib/nats/client.rb', line 297 def request(*args, &blk) (@client ||= connect).request(*args, &blk) end |
.server_info ⇒ Hash
Returns Server information.
239 240 241 242 |
# File 'lib/nats/client.rb', line 239 def server_info return nil unless client client.server_info end |
.server_running?(uri) ⇒ Boolean
:nodoc:
327 328 329 330 331 332 333 334 |
# File 'lib/nats/client.rb', line 327 def server_running?(uri) # :nodoc: require 'socket' s = TCPSocket.new(uri.host, uri.port) s.close return true rescue return false end |
.start(*args, &blk) ⇒ Object
Create a default client connection to the server.
185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 |
# File 'lib/nats/client.rb', line 185 def start(*args, &blk) @reactor_was_running = EM.reactor_running? unless (@reactor_was_running || blk) raise(Error, "EM needs to be running when NATS.start is called without a run block") end # Setup optimized select versions if EM.epoll? EM.epoll elsif EM.kqueue? EM.kqueue elsif EM.library_type == :java # No warning needed, we're using Java NIO else Kernel.warn('Neither epoll nor kqueue are supported, performance may be impacted') end EM.run { @client = connect(*args, &blk) } end |
.stop(&blk) ⇒ Object
Close the default client connection and optionally call the associated block.
205 206 207 208 209 210 211 212 |
# File 'lib/nats/client.rb', line 205 def stop(&blk) client.close if (client and (client.connected? || client.reconnecting?)) blk.call if blk @err_cb = nil @close_cb = nil @reconnect_cb = nil @disconnect_cb = nil end |
.subscribe(*args, &blk) ⇒ Object
Subscribe using the default client connection.
279 280 281 |
# File 'lib/nats/client.rb', line 279 def subscribe(*args, &blk) (@client ||= connect).subscribe(*args, &blk) end |
.timeout(*args, &blk) ⇒ Object
Set a timeout for receiving messages for the subscription.
291 292 293 |
# File 'lib/nats/client.rb', line 291 def timeout(*args, &blk) (@client ||= connect).timeout(*args, &blk) end |
.unsubscribe(*args) ⇒ Object
Cancel a subscription on the default client connection.
285 286 287 |
# File 'lib/nats/client.rb', line 285 def unsubscribe(*args) (@client ||= connect).unsubscribe(*args) end |
.wait_for_server(uri, max_wait = 5) ⇒ Object
:nodoc:
319 320 321 322 323 324 325 |
# File 'lib/nats/client.rb', line 319 def wait_for_server(uri, max_wait = 5) # :nodoc: start = Time.now while (Time.now - start < max_wait) # Wait max_wait seconds max break if server_running?(uri) sleep(0.1) end end |
Instance Method Details
#attempt_reconnect ⇒ Object
:nodoc:
1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 |
# File 'lib/nats/client.rb', line 1013 def attempt_reconnect #:nodoc: @reconnect_timer = nil current = server_pool.first # Snapshot time when trying to reconnect to server # in order to back off for subsequent attempts. current[:last_reconnect_attempt] = MonotonicTime.now current[:reconnect_attempts] ||= 0 current[:reconnect_attempts] += 1 send_connect_command begin EM.reconnect(@uri.host, @uri.port, self) rescue current[:error_received] = true @uri = nil @connected = false end end |
#auth_connection? ⇒ Boolean
630 631 632 |
# File 'lib/nats/client.rb', line 630 def auth_connection? !@uri.user.nil? end |
#bind_primary ⇒ Object
:nodoc:
1071 1072 1073 1074 1075 1076 1077 |
# File 'lib/nats/client.rb', line 1071 def bind_primary #:nodoc: first = server_pool.first @uri = first[:uri] @uri.user = [:user] if [:user] @uri.password = [:pass] if [:pass] first end |
#can_reuse_server?(server) ⇒ Boolean
:nodoc:
1007 1008 1009 1010 1011 |
# File 'lib/nats/client.rb', line 1007 def can_reuse_server?(server) #:nodoc: # If we will retry a number of times to reconnect to a server # unless we got an error from it already. reconnecting? && server[:reconnect_attempts] <= @options[:max_reconnect_attempts] && !server[:error_received] end |
#cancel_ping_timer ⇒ Object
843 844 845 846 847 848 |
# File 'lib/nats/client.rb', line 843 def cancel_ping_timer if @ping_timer EM.cancel_timer(@ping_timer) @ping_timer = nil end end |
#cancel_reconnect_timer ⇒ Object
976 977 978 979 980 981 |
# File 'lib/nats/client.rb', line 976 def cancel_reconnect_timer if @reconnect_timer EM.cancel_timer(@reconnect_timer) @reconnect_timer = nil end end |
#client_using_secure_connection? ⇒ Boolean
820 821 822 |
# File 'lib/nats/client.rb', line 820 def client_using_secure_connection? @tls || @ssl end |
#close ⇒ Object
Close the connection to the server.
613 614 615 616 617 618 619 |
# File 'lib/nats/client.rb', line 613 def close @closing = true cancel_ping_timer cancel_reconnect_timer close_connection_after_writing if connected? process_disconnect if reconnecting? end |
#connect_command ⇒ Object
:nodoc:
634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 |
# File 'lib/nats/client.rb', line 634 def connect_command #:nodoc: cs = { :verbose => @options[:verbose], :pedantic => @options[:pedantic], :lang => ::NATS::LANG, :version => ::NATS::VERSION, :protocol => ::NATS::PROTOCOL_VERSION } if auth_connection? cs[:user] = @uri.user cs[:pass] = @uri.password end cs[:name] = @options[:name] if @options[:name] cs[:ssl_required] = @ssl if @ssl cs[:tls_required] = true if @tls "CONNECT #{cs.to_json}#{CR_LF}" end |
#connected_server ⇒ URI
Returns Connected server.
1061 1062 1063 |
# File 'lib/nats/client.rb', line 1061 def connected_server connected? ? @uri : nil end |
#connection_completed ⇒ Object
:nodoc:
850 851 852 853 854 855 856 857 858 859 860 861 862 |
# File 'lib/nats/client.rb', line 850 def connection_completed #:nodoc: @parse_state = AWAITING_CONTROL_LINE # Delay sending CONNECT or any other command here until we are sure # that we have a valid established secure connection. return if (@ssl or @tls) # Mark that we established already TCP connection to the server. In case of TLS, # prepare commands which will be dispatched to server and delay flushing until # we have processed the INFO line sent by the server and done the handshake. @connected = true process_connect end |
#disconnect_error_string ⇒ Object
983 984 985 986 |
# File 'lib/nats/client.rb', line 983 def disconnect_error_string return "Client disconnected from server on #{@uri}" if @connected return "Could not connect to server on #{@uri}" end |
#discovered_servers ⇒ Object
Retrieves the list of servers which have been discovered via server connect_urls announcements
1067 1068 1069 |
# File 'lib/nats/client.rb', line 1067 def discovered_servers server_pool.select {|s| s[:discovered] } end |
#flush(&blk) ⇒ Object
Flushes all messages and subscriptions for the connection. All messages and subscriptions have been processed by the server when the optional callback is called.
578 579 580 |
# File 'lib/nats/client.rb', line 578 def flush(&blk) queue_server_rt(&blk) if blk end |
#flush_pending ⇒ Object
:nodoc:
696 697 698 699 700 |
# File 'lib/nats/client.rb', line 696 def flush_pending #:nodoc: return unless @pending send_data(@pending.join) @pending, @pending_size = nil, 0 end |
#had_error? ⇒ Boolean
968 969 970 |
# File 'lib/nats/client.rb', line 968 def had_error? server_pool.first && server_pool.first[:error_received] end |
#initialize(options) ⇒ Object
357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 |
# File 'lib/nats/client.rb', line 357 def initialize() @options = @buf = nil @ssid, @subs = 1, {} @err_cb = NATS.err_cb @close_cb = NATS.close_cb @reconnect_cb = NATS.reconnect_cb @disconnect_cb = NATS.disconnect_cb @reconnect_timer, @needed = nil, nil @connected, @closing, @reconnecting, @conn_cb_called = false, false, false, false @msgs_received = @msgs_sent = @bytes_received = @bytes_sent = @pings = 0 @pending_size = 0 @server_info = { } # Mark whether we should be connecting securely, try best effort # in being compatible with present ssl support. @ssl = false @tls = nil @tls = [:tls] if [:tls] @ssl = [:ssl] if [:ssl] or @tls # New style request/response implementation. @resp_sub = nil @resp_map = nil @resp_sub_prefix = nil @nuid = NATS::NUID.new send_connect_command end |
#inspect ⇒ Object
:nodoc:
1107 1108 1109 |
# File 'lib/nats/client.rb', line 1107 def inspect #:nodoc: "<nats client v#{NATS::VERSION}>" end |
#multiple_servers_available? ⇒ Boolean
964 965 966 |
# File 'lib/nats/client.rb', line 964 def multiple_servers_available? server_pool && server_pool.size > 1 end |
#on_close(&callback) ⇒ Object
Define a callback to be called when client is disconnected from server.
608 609 610 |
# File 'lib/nats/client.rb', line 608 def on_close(&callback) @close_cb = callback end |
#on_connect(&callback) ⇒ Object
Define a callback to be called when the client connection has been established.
584 585 586 |
# File 'lib/nats/client.rb', line 584 def on_connect(&callback) @connect_cb = callback end |
#on_disconnect(&callback) ⇒ Object
Define a callback to be called when client is disconnected from server.
602 603 604 |
# File 'lib/nats/client.rb', line 602 def on_disconnect(&callback) @disconnect_cb = callback end |
#on_error(&callback) ⇒ Object
Define a callback to be called when errors occur on the client connection.
590 591 592 |
# File 'lib/nats/client.rb', line 590 def on_error(&callback) @err_cb, @err_cb_overridden = callback, true end |
#on_msg(subject, sid, reply, msg) ⇒ Object
:nodoc:
663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 |
# File 'lib/nats/client.rb', line 663 def on_msg(subject, sid, reply, msg) #:nodoc: # Accounting - We should account for inbound even if they are not processed. @msgs_received += 1 @bytes_received += msg.bytesize if msg return unless sub = @subs[sid] # Check for auto_unsubscribe sub[:received] += 1 if sub[:max] # Client side support in case server did not receive unsubscribe return unsubscribe(sid) if (sub[:received] > sub[:max]) # cleanup here if we have hit the max.. @subs.delete(sid) if (sub[:received] == sub[:max]) end if cb = sub[:callback] case cb.arity when 0 then cb.call when 1 then cb.call(msg) when 2 then cb.call(msg, reply) else cb.call(msg, reply, subject) end end # Check for a timeout, and cancel if received >= expected if (sub[:timeout] && sub[:received] >= sub[:expected]) EM.cancel_timer(sub[:timeout]) sub[:timeout] = nil end end |
#on_reconnect(&callback) ⇒ Object
Define a callback to be called when a reconnect attempt is made.
596 597 598 |
# File 'lib/nats/client.rb', line 596 def on_reconnect(&callback) @reconnect_cb = callback end |
#pending_data_size ⇒ Object
Return bytes outstanding waiting to be sent to server.
622 623 624 |
# File 'lib/nats/client.rb', line 622 def pending_data_size get_outbound_data_size + @pending_size end |
#process_connect ⇒ Object
:nodoc:
869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 |
# File 'lib/nats/client.rb', line 869 def process_connect #:nodoc: # Reset reconnect attempts since TCP connection has been successful at this point. current = server_pool.first current[:was_connected] = true current[:reconnect_attempts] ||= 0 cancel_reconnect_timer if reconnecting? # Whip through any pending SUB commands since we replay # all subscriptions already done anyway. @pending.delete_if { |sub| sub[0..2] == SUB_OP } if @pending @subs.each_pair { |k, v| send_command("SUB #{v[:subject]} #{v[:queue]} #{k}#{CR_LF}") } unless user_err_cb? or reconnecting? @err_cb = proc { |e| raise e } end # We have validated the connection at this point so send CONNECT # and any other pending commands which we need to the server. flush_pending if (connect_cb and not @conn_cb_called) # We will round trip the server here to make sure all state from any pending commands # has been processed before calling the connect callback. queue_server_rt do connect_cb.call(self) @conn_cb_called = true end end # Notify via reconnect callback that we are again plugged again into the system. if reconnecting? @reconnecting = false @reconnect_cb.call(self) unless @reconnect_cb.nil? end # Initialize ping timer and processing @pings_outstanding = 0 @pongs_received = 0 @ping_timer = EM.add_periodic_timer(@options[:ping_interval]) do send_ping end end |
#process_disconnect ⇒ Object
:nodoc:
988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 |
# File 'lib/nats/client.rb', line 988 def process_disconnect #:nodoc: # Mute error callback when user has called NATS.close on purpose. if not closing? and @err_cb # Always call error callback for compatibility with previous behavior. err_cb.call(NATS::ConnectError.new(disconnect_error_string)) end close_cb.call if @close_cb true # Chaining ensure cancel_ping_timer cancel_reconnect_timer if (NATS.client == self) NATS.clear_client EM.stop if ((connected? || reconnecting?) and closing? and not NATS.reactor_was_running?) end @connected = @reconnecting = false end |
#process_info(info) ⇒ Object
:nodoc:
755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 |
# File 'lib/nats/client.rb', line 755 def process_info(info) #:nodoc: # Each JSON parser uses a different key/value pair to use symbol keys # instead of strings when parsing. Passing all three pairs assures each # parser gets what it needs. For the json gem :symbolize_name, for yajl # :symbolize_keys, and for oj :symbol_keys. @server_info = JSON.parse(info, :symbolize_keys => true, :symbolize_names => true, :symbol_keys => true) case when (server_using_secure_connection? and client_using_secure_connection?) # Allow parameterizing secure connection via EM#start_tls directly if present. start_tls(@tls || {}) when (server_using_secure_connection? and !client_using_secure_connection?) # Call unbind since there is a configuration mismatch between client/server # anyway and communication cannot happen in this state. err_cb.call(NATS::ClientError.new('TLS/SSL required by server')) close_connection_after_writing when (client_using_secure_connection? and !server_using_secure_connection?) err_cb.call(NATS::ClientError.new('TLS/SSL not supported by server')) close_connection_after_writing else # Otherwise, use a regular connection. end # Detect any announced server that we might not be aware of... connect_urls = @server_info[:connect_urls] if connect_urls srvs = [] connect_urls.each do |url| u = URI.parse("nats://#{url}") present = server_pool.detect do |srv| srv[:uri].host == u.host && srv[:uri].port == u.port end if not present # Let explicit user and pass options set the credentials. u.user = [:user] if [:user] u.password = [:pass] if [:pass] # Use creds from the current server if not set explicitly. if @uri u.user ||= @uri.user if @uri.user u.password ||= @uri.password if @uri.password end srvs << { :uri => u, :reconnect_attempts => 0, :discovered => true } end end srvs.shuffle! unless @options[:dont_randomize_servers] # Include in server pool but keep current one as the first one. server_pool.push(*srvs) end if @server_info[:auth_required] current = server_pool.first current[:auth_required] = true # Send pending connect followed by ping/pong to ensure we're authorized. queue_server_rt { current[:auth_ok] = true } flush_pending end @server_info end |
#process_pong ⇒ Object
924 925 926 927 |
# File 'lib/nats/client.rb', line 924 def process_pong @pongs_received += 1 @pings_outstanding -= 1 end |
#process_uri_options ⇒ Object
Parse out URIs which can now be an array of server choices The server pool will contain both explicit and implicit members.
1052 1053 1054 1055 1056 1057 1058 |
# File 'lib/nats/client.rb', line 1052 def #:nodoc @server_pool = [] uri = [:uris] || [:servers] || [:uri] uri = uri.kind_of?(Array) ? uri : [uri] uri.each { |u| server_pool << { :uri => u.is_a?(URI) ? u.dup : URI.parse(u) } } bind_primary end |
#publish(subject, msg = EMPTY_MSG, opt_reply = nil, &blk) ⇒ Object
Publish a message to a given subject, with optional reply subject and completion block
394 395 396 397 398 399 400 401 402 403 404 |
# File 'lib/nats/client.rb', line 394 def publish(subject, msg=EMPTY_MSG, opt_reply=nil, &blk) return unless subject msg = msg.to_s # Accounting @msgs_sent += 1 @bytes_sent += msg.bytesize if msg send_command("PUB #{subject} #{opt_reply} #{msg.bytesize}#{CR_LF}#{msg}#{CR_LF}") queue_server_rt(&blk) if blk end |
#queue_server_rt(&cb) ⇒ Object
:nodoc:
657 658 659 660 661 |
# File 'lib/nats/client.rb', line 657 def queue_server_rt(&cb) #:nodoc: return unless cb (@pongs ||= []) << cb send_command(PING_REQUEST) end |
#receive_data(data) ⇒ Object
:nodoc:
702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 |
# File 'lib/nats/client.rb', line 702 def receive_data(data) #:nodoc: @buf = @buf ? @buf << data : data while (@buf) case @parse_state when AWAITING_CONTROL_LINE case @buf when MSG @buf = $' @sub, @sid, @reply, @needed = $1, $2.to_i, $4, $5.to_i @parse_state = AWAITING_MSG_PAYLOAD when OK # No-op right now @buf = $' when ERR @buf = $' current = server_pool.first current[:error_received] = true if current[:auth_required] && !current[:auth_ok] err_cb.call(NATS::AuthError.new($1)) else err_cb.call(NATS::ServerError.new($1)) end when PING @pings += 1 @buf = $' send_command(PONG_RESPONSE) when PONG @buf = $' cb = @pongs.shift cb.call if cb when INFO @buf = $' process_info($1) when UNKNOWN @buf = $' err_cb.call(NATS::ServerError.new("Unknown protocol: #{$1}")) else # If we are here we do not have a complete line yet that we understand. return end @buf = nil if (@buf && @buf.empty?) when AWAITING_MSG_PAYLOAD return unless (@needed && @buf.bytesize >= (@needed + CR_LF_SIZE)) on_msg(@sub, @sid, @reply, @buf.slice(0, @needed)) @buf = @buf.slice((@needed + CR_LF_SIZE), @buf.bytesize) @sub = @sid = @reply = @needed = nil @parse_state = AWAITING_CONTROL_LINE @buf = nil if (@buf && @buf.empty?) end end end |
#request(subject, data = nil, opts = {}, &cb) ⇒ Object
Send a request and have the response delivered to the supplied callback.
469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 |
# File 'lib/nats/client.rb', line 469 def request(subject, data=nil, opts={}, &cb) return unless subject # In case of using async request then fallback to auto unsubscribe # based request/response and not break compatibility too much since # new request/response style can only be used with fibers. if cb inbox = "_INBOX.#{@nuid.next}" s = subscribe(inbox, opts) { |msg, reply| case cb.arity when 0 then cb.call when 1 then cb.call(msg) else cb.call(msg, reply) end } publish(subject, data, inbox) return s end # If this is the first request being made, then need to start # the responses mux handler that handles the responses. start_resp_mux_sub! unless @resp_sub_prefix # Generate unique token for the reply subject. token = @nuid.next inbox = "#{@resp_sub_prefix}.#{token}" # Synchronous request/response requires using a Fiber # to be able to await the response. f = Fiber.current @resp_map[token][:fiber] = f # If awaiting more than a single response then use array # to include all that could be gathered before the deadline. expected = opts[:max] ||= 1 @resp_map[token][:expected] = expected @resp_map[token][:msgs] = [] if expected > 1 # Announce the request with the inbox using the token. publish(subject, data, inbox) # If deadline expires, then discard the token and resume fiber opts[:timeout] ||= 0.5 t = EM.add_timer(opts[:timeout]) do if expected > 1 f.resume @resp_map[token][:msgs] else f.resume end @resp_map.delete(token) end # Wait for the response and cancel timeout callback if received. if expected > 1 # Wait to receive all replies that can get before deadline. msgs = Fiber.yield EM.cancel_timer(t) # Slice and throwaway responses that are not needed. return msgs.slice(0, expected) else msg = Fiber.yield EM.cancel_timer(t) return msg end end |
#schedule_primary_and_connect ⇒ Object
We have failed on an attempt at the primary (first) server, rotate and try again
1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 |
# File 'lib/nats/client.rb', line 1080 def schedule_primary_and_connect #:nodoc: # Dump the one we were trying if it wasn't connected current = server_pool.shift # In case there was an error from the server we will take it out from rotation # unless we specify infinite reconnects via setting :max_reconnect_attempts to -1 if current && ([:max_reconnect_attempts] < 0 || can_reuse_server?(current)) server_pool << current end # If we are out of options, go ahead and disconnect then # handle closing connection to NATS. process_disconnect and return if server_pool.empty? # bind new one next_server = bind_primary # If the next one was connected and we are trying to reconnect # set up timer if we tried once already. if should_delay_connect?(next_server) schedule_reconnect else attempt_reconnect schedule_primary_and_connect if had_error? end end |
#schedule_reconnect ⇒ Object
:nodoc:
940 941 942 943 944 |
# File 'lib/nats/client.rb', line 940 def schedule_reconnect #:nodoc: @reconnecting = true @connected = false @reconnect_timer = EM.add_timer(@options[:reconnect_time_wait]) { attempt_reconnect } end |
#send_command(command, priority = false) ⇒ Object
:nodoc:
1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 |
# File 'lib/nats/client.rb', line 1033 def send_command(command, priority = false) #:nodoc: needs_flush = (connected? && @pending.nil?) @pending ||= [] @pending << command unless priority @pending.unshift(command) if priority @pending_size += command.bytesize EM.next_tick { flush_pending } if needs_flush flush_pending if (connected? && @pending_size > MAX_PENDING_SIZE) if (@options[:fast_producer_error] && pending_data_size > FAST_PRODUCER_THRESHOLD) err_cb.call(NATS::ClientError.new("Fast Producer: #{pending_data_size} bytes outstanding")) end true end |
#send_connect_command ⇒ Object
:nodoc:
653 654 655 |
# File 'lib/nats/client.rb', line 653 def send_connect_command #:nodoc: send_command(connect_command, true) end |
#send_ping ⇒ Object
:nodoc:
912 913 914 915 916 917 918 919 920 921 922 |
# File 'lib/nats/client.rb', line 912 def send_ping #:nodoc: return if @closing @pings_outstanding += 1 if @pings_outstanding > @options[:max_outstanding_pings] close_connection #close return end queue_server_rt { process_pong } flush_pending end |
#server_using_secure_connection? ⇒ Boolean
824 825 826 |
# File 'lib/nats/client.rb', line 824 def server_using_secure_connection? @server_info[:ssl_required] || @server_info[:tls_required] end |
#should_delay_connect?(server) ⇒ Boolean
929 930 931 932 933 934 935 936 937 938 |
# File 'lib/nats/client.rb', line 929 def should_delay_connect?(server) case when server[:was_connected] server[:reconnect_attempts] >= 0 when server[:last_reconnect_attempt] (MonotonicTime.now - server[:last_reconnect_attempt]) < @options[:reconnect_time_wait] else false end end |
#should_not_reconnect? ⇒ Boolean
972 973 974 |
# File 'lib/nats/client.rb', line 972 def should_not_reconnect? !@options[:reconnect] end |
#ssl_handshake_completed ⇒ Object
864 865 866 867 |
# File 'lib/nats/client.rb', line 864 def ssl_handshake_completed @connected = true process_connect end |
#ssl_verify_peer(cert) ⇒ Object
828 829 830 831 832 833 834 835 836 837 838 839 840 841 |
# File 'lib/nats/client.rb', line 828 def ssl_verify_peer(cert) ca_file = File.read(@options[:tls][:ca_file]) ca = OpenSSL::X509::Certificate.new(ca_file) incoming = OpenSSL::X509::Certificate.new(cert) unless incoming.issuer.to_s == ca.subject.to_s && incoming.verify(ca.public_key) err_cb.call(NATS::ConnectError.new("TLS Verification failed checking issuer based on CA %s" % @options[:ca_file])) false else true end rescue NATS::ConnectError false end |
#start_resp_mux_sub! ⇒ Object
537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 |
# File 'lib/nats/client.rb', line 537 def start_resp_mux_sub! @resp_sub_prefix = "_INBOX.#{@nuid.next}" @resp_map = Hash.new { |h,k| h[k] = { }} # Single subscription that will be handling all the requests # using fibers to yield the responses. subscribe("#{@resp_sub_prefix}.*") do |msg, reply, subject| token = subject.split('.').last # Discard the response if requestor not interested already. next unless @resp_map.key? token # Take fiber that will be passed the response f = @resp_map[token][:fiber] expected = @resp_map[token][:expected] if expected == 1 f.resume msg @resp_map.delete(token) next end if @resp_map[token][:msgs].size < expected @resp_map[token][:msgs] << msg msgs = @resp_map[token][:msgs] if msgs.size >= expected f.resume(msgs) else # Wait to gather more messages or timeout. next end end @resp_map.delete(token) end end |
#subscribe(subject, opts = {}, &callback) ⇒ Object
Subscribe to a subject with optional wildcards. Messages will be delivered to the supplied callback. Callback can take any number of the supplied arguments as defined by the list: msg, reply, sub. Returns subscription id which can be passed to #unsubscribe.
414 415 416 417 418 419 420 421 422 423 424 |
# File 'lib/nats/client.rb', line 414 def subscribe(subject, opts={}, &callback) return unless subject sid = (@ssid += 1) sub = @subs[sid] = { :subject => subject, :callback => callback, :received => 0 } sub[:queue] = opts[:queue] if opts[:queue] sub[:max] = opts[:max] if opts[:max] send_command("SUB #{subject} #{opts[:queue]} #{sid}#{CR_LF}") # Setup server support for auto-unsubscribe unsubscribe(sid, opts[:max]) if opts[:max] sid end |
#subscription_count ⇒ Number
Return the active subscription count.
439 440 441 |
# File 'lib/nats/client.rb', line 439 def subscription_count @subs.size end |
#timeout(sid, timeout, opts = {}, &callback) ⇒ Object
Setup a timeout for receiving messages for the subscription.
447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 |
# File 'lib/nats/client.rb', line 447 def timeout(sid, timeout, opts={}, &callback) # Setup a timeout if requested return unless sub = @subs[sid] auto_unsubscribe, expected = true, 1 auto_unsubscribe = opts[:auto_unsubscribe] if opts.key?(:auto_unsubscribe) expected = opts[:expected] if opts.key?(:expected) EM.cancel_timer(sub[:timeout]) if sub[:timeout] sub[:timeout] = EM.add_timer(timeout) do unsubscribe(sid) if auto_unsubscribe callback.call(sid) if callback end sub[:expected] = expected end |
#unbind ⇒ Object
:nodoc:
946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 |
# File 'lib/nats/client.rb', line 946 def unbind #:nodoc: # Allow notifying from which server we were disconnected, # but only when we didn't trigger disconnecting ourselves. if @disconnect_cb and connected? and not closing? @disconnect_cb.call(NATS::ConnectError.new(disconnect_error_string)) end # If we are closing or shouldn't reconnect, go ahead and disconnect. process_disconnect and return if (closing? or should_not_reconnect?) @reconnecting = true if connected? @connected = false @pending = @pongs = nil @buf = nil cancel_ping_timer schedule_primary_and_connect end |
#unsubscribe(sid, opt_max = nil) ⇒ Object
Cancel a subscription.
429 430 431 432 433 434 435 |
# File 'lib/nats/client.rb', line 429 def unsubscribe(sid, opt_max=nil) opt_max_str = " #{opt_max}" unless opt_max.nil? send_command("UNSUB #{sid}#{opt_max_str}#{CR_LF}") return unless sub = @subs[sid] sub[:max] = opt_max @subs.delete(sid) unless (sub[:max] && (sub[:received] < sub[:max])) end |
#user_err_cb? ⇒ Boolean
:nodoc:
626 627 628 |
# File 'lib/nats/client.rb', line 626 def user_err_cb? # :nodoc: err_cb_overridden || NATS.err_cb_overridden end |