Class: NatsWork::Connection
- Inherits:
-
Object
- Object
- NatsWork::Connection
- Defined in:
- lib/natswork/connection.rb
Instance Attribute Summary collapse
-
#connection ⇒ Object
readonly
Returns the value of attribute connection.
-
#max_reconnect_attempts ⇒ Object
readonly
Returns the value of attribute max_reconnect_attempts.
-
#password ⇒ Object
readonly
Returns the value of attribute password.
-
#reconnect_time_wait ⇒ Object
readonly
Returns the value of attribute reconnect_time_wait.
-
#servers ⇒ Object
readonly
Returns the value of attribute servers.
-
#user ⇒ Object
readonly
Returns the value of attribute user.
Instance Method Summary collapse
- #connect ⇒ Object
- #connected? ⇒ Boolean
- #disconnect ⇒ Object
- #healthy? ⇒ Boolean
-
#initialize(options = {}) ⇒ Connection
constructor
A new instance of Connection.
- #jetstream ⇒ Object
- #on_disconnect(&block) ⇒ Object
- #on_error(&block) ⇒ Object
- #on_reconnect(&block) ⇒ Object
- #ping ⇒ Object
- #publish(subject, payload) ⇒ Object
- #request(subject, payload, opts = {}) ⇒ Object
- #stats ⇒ Object
- #subscribe(subject, opts = {}, &block) ⇒ Object
- #unsubscribe(sid) ⇒ Object
- #with_connection {|_self| ... } ⇒ Object
Constructor Details
#initialize(options = {}) ⇒ Connection
11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
# File 'lib/natswork/connection.rb', line 11 def initialize( = {}) @servers = [:servers] || ['nats://localhost:4222'] @max_reconnect_attempts = [:max_reconnect_attempts] || 10 @reconnect_time_wait = [:reconnect_time_wait] || 2 @user = [:user] @password = [:password] @tls = [:tls] @client = NATS::IO::Client.new @connection = @client # Alias for backward compatibility @connected = false @jetstream_context = nil @mutex = Mutex.new @reconnect_callbacks = [] @disconnect_callbacks = [] @error_callbacks = [] @last_ping_time = nil @last_error = nil end |
Instance Attribute Details
#connection ⇒ Object (readonly)
Returns the value of attribute connection.
9 10 11 |
# File 'lib/natswork/connection.rb', line 9 def connection @connection end |
#max_reconnect_attempts ⇒ Object (readonly)
Returns the value of attribute max_reconnect_attempts.
9 10 11 |
# File 'lib/natswork/connection.rb', line 9 def max_reconnect_attempts @max_reconnect_attempts end |
#password ⇒ Object (readonly)
Returns the value of attribute password.
9 10 11 |
# File 'lib/natswork/connection.rb', line 9 def password @password end |
#reconnect_time_wait ⇒ Object (readonly)
Returns the value of attribute reconnect_time_wait.
9 10 11 |
# File 'lib/natswork/connection.rb', line 9 def reconnect_time_wait @reconnect_time_wait end |
#servers ⇒ Object (readonly)
Returns the value of attribute servers.
9 10 11 |
# File 'lib/natswork/connection.rb', line 9 def servers @servers end |
#user ⇒ Object (readonly)
Returns the value of attribute user.
9 10 11 |
# File 'lib/natswork/connection.rb', line 9 def user @user end |
Instance Method Details
#connect ⇒ Object
30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 |
# File 'lib/natswork/connection.rb', line 30 def connect return true if @connected && @client.connected? = { servers: @servers, max_reconnect_attempts: @max_reconnect_attempts, reconnect_time_wait: @reconnect_time_wait } [:user] = @user if @user [:password] = @password if @password [:tls] = @tls if @tls setup_handlers begin @client.connect() @connected = true @client.connected? rescue NATS::IO::ConnectError => e raise ConnectionError, "Failed to connect to NATS: #{e.message}" rescue StandardError => e raise ConnectionError, "Connection error: #{e.message}" end end |
#connected? ⇒ Boolean
66 67 68 |
# File 'lib/natswork/connection.rb', line 66 def connected? @connected && @client.connected? end |
#disconnect ⇒ Object
56 57 58 59 60 61 62 63 64 |
# File 'lib/natswork/connection.rb', line 56 def disconnect return unless @connected @mutex.synchronize do @client.close if @client.connected? @connected = false @jetstream_context = nil end end |
#healthy? ⇒ Boolean
143 144 145 |
# File 'lib/natswork/connection.rb', line 143 def healthy? connected? && ping end |
#jetstream ⇒ Object
127 128 129 130 131 |
# File 'lib/natswork/connection.rb', line 127 def jetstream raise ConnectionError, 'Not connected to NATS' unless connected? @jetstream_context ||= @client.jetstream end |
#on_disconnect(&block) ⇒ Object
165 166 167 |
# File 'lib/natswork/connection.rb', line 165 def on_disconnect(&block) @disconnect_callbacks << block if block_given? end |
#on_error(&block) ⇒ Object
169 170 171 |
# File 'lib/natswork/connection.rb', line 169 def on_error(&block) @error_callbacks << block if block_given? end |
#on_reconnect(&block) ⇒ Object
161 162 163 |
# File 'lib/natswork/connection.rb', line 161 def on_reconnect(&block) @reconnect_callbacks << block if block_given? end |
#ping ⇒ Object
147 148 149 150 151 152 153 154 155 156 157 158 159 |
# File 'lib/natswork/connection.rb', line 147 def ping return false unless connected? begin # Send a ping by doing a simple request to a non-existent subject with short timeout @client.flush(1) @last_ping_time = Time.now true rescue StandardError => e @last_error = e. false end end |
#publish(subject, payload) ⇒ Object
70 71 72 73 74 75 |
# File 'lib/natswork/connection.rb', line 70 def publish(subject, payload) raise ConnectionError, 'Not connected to NATS' unless connected? data = payload.is_a?(String) ? payload : JSON.generate(payload) @client.publish(subject, data) end |
#request(subject, payload, opts = {}) ⇒ Object
96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 |
# File 'lib/natswork/connection.rb', line 96 def request(subject, payload, opts = {}) raise ConnectionError, 'Not connected to NATS' unless connected? timeout = opts[:timeout] || 5 data = payload.is_a?(String) ? payload : JSON.generate(payload) begin response = @client.request(subject, data, timeout: timeout) JSON.parse(response.data) rescue NATS::IO::Timeout raise TimeoutError, "Request timed out after #{timeout} seconds" rescue JSON::ParserError response.data end end |
#stats ⇒ Object
133 134 135 136 137 138 139 140 141 |
# File 'lib/natswork/connection.rb', line 133 def stats return {} unless @client stats = @client.stats || {} stats[:last_ping_time] = @last_ping_time stats[:healthy] = healthy? stats[:last_error] = @last_error stats end |
#subscribe(subject, opts = {}, &block) ⇒ Object
77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 |
# File 'lib/natswork/connection.rb', line 77 def subscribe(subject, opts = {}, &block) raise ConnectionError, 'Not connected to NATS' unless connected? wrapped_callback = proc do |msg, reply, subject, sid| parsed_msg = begin JSON.parse(msg, symbolize_names: true) rescue JSON::ParserError msg end block.call(parsed_msg, reply, subject, sid) end if opts[:queue] @client.subscribe(subject, queue: opts[:queue], &wrapped_callback) else @client.subscribe(subject, &wrapped_callback) end end |
#unsubscribe(sid) ⇒ Object
112 113 114 115 116 117 118 119 120 |
# File 'lib/natswork/connection.rb', line 112 def unsubscribe(sid) return unless connected? begin @client.send(:unsubscribe, sid) rescue StandardError # Ignore errors for invalid SIDs end end |
#with_connection {|_self| ... } ⇒ Object
122 123 124 125 |
# File 'lib/natswork/connection.rb', line 122 def with_connection connect unless connected? yield(self) end |