Class: NatsWork::Connection

Inherits:
Object
  • Object
show all
Defined in:
lib/natswork/connection.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ Connection



11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# File 'lib/natswork/connection.rb', line 11

def initialize(options = {})
  @servers = options[:servers] || ['nats://localhost:4222']
  @max_reconnect_attempts = options[:max_reconnect_attempts] || 10
  @reconnect_time_wait = options[:reconnect_time_wait] || 2
  @user = options[:user]
  @password = options[:password]
  @tls = options[:tls]
  @client = NATS::IO::Client.new
  @connection = @client # Alias for backward compatibility
  @connected = false
  @jetstream_context = nil
  @mutex = Mutex.new
  @reconnect_callbacks = []
  @disconnect_callbacks = []
  @error_callbacks = []
  @last_ping_time = nil
  @last_error = nil
end

Instance Attribute Details

#connectionObject (readonly)

Returns the value of attribute connection.



9
10
11
# File 'lib/natswork/connection.rb', line 9

def connection
  @connection
end

#max_reconnect_attemptsObject (readonly)

Returns the value of attribute max_reconnect_attempts.



9
10
11
# File 'lib/natswork/connection.rb', line 9

def max_reconnect_attempts
  @max_reconnect_attempts
end

#passwordObject (readonly)

Returns the value of attribute password.



9
10
11
# File 'lib/natswork/connection.rb', line 9

def password
  @password
end

#reconnect_time_waitObject (readonly)

Returns the value of attribute reconnect_time_wait.



9
10
11
# File 'lib/natswork/connection.rb', line 9

def reconnect_time_wait
  @reconnect_time_wait
end

#serversObject (readonly)

Returns the value of attribute servers.



9
10
11
# File 'lib/natswork/connection.rb', line 9

def servers
  @servers
end

#userObject (readonly)

Returns the value of attribute user.



9
10
11
# File 'lib/natswork/connection.rb', line 9

def user
  @user
end

Instance Method Details

#connectObject



30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
# File 'lib/natswork/connection.rb', line 30

def connect
  return true if @connected && @client.connected?

  options = {
    servers: @servers,
    max_reconnect_attempts: @max_reconnect_attempts,
    reconnect_time_wait: @reconnect_time_wait
  }

  options[:user] = @user if @user
  options[:password] = @password if @password
  options[:tls] = @tls if @tls

  setup_handlers

  begin
    @client.connect(options)
    @connected = true
    @client.connected?
  rescue NATS::IO::ConnectError => e
    raise ConnectionError, "Failed to connect to NATS: #{e.message}"
  rescue StandardError => e
    raise ConnectionError, "Connection error: #{e.message}"
  end
end

#connected?Boolean



66
67
68
# File 'lib/natswork/connection.rb', line 66

def connected?
  @connected && @client.connected?
end

#disconnectObject



56
57
58
59
60
61
62
63
64
# File 'lib/natswork/connection.rb', line 56

def disconnect
  return unless @connected

  @mutex.synchronize do
    @client.close if @client.connected?
    @connected = false
    @jetstream_context = nil
  end
end

#healthy?Boolean



143
144
145
# File 'lib/natswork/connection.rb', line 143

def healthy?
  connected? && ping
end

#jetstreamObject

Raises:



127
128
129
130
131
# File 'lib/natswork/connection.rb', line 127

def jetstream
  raise ConnectionError, 'Not connected to NATS' unless connected?

  @jetstream_context ||= @client.jetstream
end

#on_disconnect(&block) ⇒ Object



165
166
167
# File 'lib/natswork/connection.rb', line 165

def on_disconnect(&block)
  @disconnect_callbacks << block if block_given?
end

#on_error(&block) ⇒ Object



169
170
171
# File 'lib/natswork/connection.rb', line 169

def on_error(&block)
  @error_callbacks << block if block_given?
end

#on_reconnect(&block) ⇒ Object



161
162
163
# File 'lib/natswork/connection.rb', line 161

def on_reconnect(&block)
  @reconnect_callbacks << block if block_given?
end

#pingObject



147
148
149
150
151
152
153
154
155
156
157
158
159
# File 'lib/natswork/connection.rb', line 147

def ping
  return false unless connected?

  begin
    # Send a ping by doing a simple request to a non-existent subject with short timeout
    @client.flush(1)
    @last_ping_time = Time.now
    true
  rescue StandardError => e
    @last_error = e.message
    false
  end
end

#publish(subject, payload) ⇒ Object

Raises:



70
71
72
73
74
75
# File 'lib/natswork/connection.rb', line 70

def publish(subject, payload)
  raise ConnectionError, 'Not connected to NATS' unless connected?

  data = payload.is_a?(String) ? payload : JSON.generate(payload)
  @client.publish(subject, data)
end

#request(subject, payload, opts = {}) ⇒ Object

Raises:



96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
# File 'lib/natswork/connection.rb', line 96

def request(subject, payload, opts = {})
  raise ConnectionError, 'Not connected to NATS' unless connected?

  timeout = opts[:timeout] || 5
  data = payload.is_a?(String) ? payload : JSON.generate(payload)

  begin
    response = @client.request(subject, data, timeout: timeout)
    JSON.parse(response.data)
  rescue NATS::IO::Timeout
    raise TimeoutError, "Request timed out after #{timeout} seconds"
  rescue JSON::ParserError
    response.data
  end
end

#statsObject



133
134
135
136
137
138
139
140
141
# File 'lib/natswork/connection.rb', line 133

def stats
  return {} unless @client

  stats = @client.stats || {}
  stats[:last_ping_time] = @last_ping_time
  stats[:healthy] = healthy?
  stats[:last_error] = @last_error
  stats
end

#subscribe(subject, opts = {}, &block) ⇒ Object

Raises:



77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
# File 'lib/natswork/connection.rb', line 77

def subscribe(subject, opts = {}, &block)
  raise ConnectionError, 'Not connected to NATS' unless connected?

  wrapped_callback = proc do |msg, reply, subject, sid|
    parsed_msg = begin
      JSON.parse(msg, symbolize_names: true)
    rescue JSON::ParserError
      msg
    end
    block.call(parsed_msg, reply, subject, sid)
  end

  if opts[:queue]
    @client.subscribe(subject, queue: opts[:queue], &wrapped_callback)
  else
    @client.subscribe(subject, &wrapped_callback)
  end
end

#unsubscribe(sid) ⇒ Object



112
113
114
115
116
117
118
119
120
# File 'lib/natswork/connection.rb', line 112

def unsubscribe(sid)
  return unless connected?

  begin
    @client.send(:unsubscribe, sid)
  rescue StandardError
    # Ignore errors for invalid SIDs
  end
end

#with_connection {|_self| ... } ⇒ Object

Yields:

  • (_self)

Yield Parameters:



122
123
124
125
# File 'lib/natswork/connection.rb', line 122

def with_connection
  connect unless connected?
  yield(self)
end