Class: MCollective::Util::NatsWrapper

Inherits:
Object
  • Object
show all
Defined in:
lib/mcollective/util/natswrapper.rb

Overview

A wrapper class around the Pure Ruby NATS gem

MCollective has some non compatible expectations about how message flow works such as having a blocking receive and publish method it calls when it likes, while typical flow is to pass a block and then callbacks will be called.

This wrapper bridges the 2 worlds using ruby Queues to simulate the blocking receive expectation MCollective has thanks to its initial design around the Stomp gem.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeNatsWrapper

Returns a new instance of NatsWrapper.



18
19
20
21
22
23
24
# File 'lib/mcollective/util/natswrapper.rb', line 18

def initialize
  @received_queue = Queue.new
  @subscriptions = {}
  @subscription_mutex = Mutex.new
  @started = false
  @client = NATS::IO::Client.new
end

Instance Attribute Details

#received_queueObject (readonly)

Returns the value of attribute received_queue.



16
17
18
# File 'lib/mcollective/util/natswrapper.rb', line 16

def received_queue
  @received_queue
end

#subscriptionsObject (readonly)

Returns the value of attribute subscriptions.



16
17
18
# File 'lib/mcollective/util/natswrapper.rb', line 16

def subscriptions
  @subscriptions
end

Instance Method Details

#active_optionsHash

Connection options from the NATS gem

Returns:

  • (Hash)


75
76
77
78
79
# File 'lib/mcollective/util/natswrapper.rb', line 75

def active_options
  return {} unless has_client?

  @client.options
end

#backoff_sleepvoid

This method returns an undefined value.

Does a backoff sleep up to 2 seconds



91
92
93
94
95
96
97
98
99
100
101
# File 'lib/mcollective/util/natswrapper.rb', line 91

def backoff_sleep
  @backoffcount ||= 1

  if @backoffcount >= 50
    sleep(2)
  else
    sleep(0.04 * @backoffcount)
  end

  @backoffcount += 1
end

#client_flavourString

Client library flavour

Returns:



61
62
63
# File 'lib/mcollective/util/natswrapper.rb', line 61

def client_flavour
  "nats-pure"
end

#client_versionString

Client library version

Returns:



68
69
70
# File 'lib/mcollective/util/natswrapper.rb', line 68

def client_version
  NATS::IO::VERSION
end

#connected?Boolean

Is NATS connected

Returns:

  • (Boolean)


84
85
86
# File 'lib/mcollective/util/natswrapper.rb', line 84

def connected?
  has_client? && @client.connected?
end

#connected_serverString?

Retrieves the current connected server

Returns:



43
44
45
46
47
# File 'lib/mcollective/util/natswrapper.rb', line 43

def connected_server
  return nil unless connected?

  @client.connected_server
end

#has_client?Boolean

Is there a NATS client created

Returns:

  • (Boolean)


36
37
38
# File 'lib/mcollective/util/natswrapper.rb', line 36

def has_client?
  !!@client
end

#log_nats_poolvoid

This method returns an undefined value.

Logs the NATS server pool for nats-pure

The current server pool is dynamic as the NATS servers can announce new cluster members as they join the pool, little helper for logging the pool on major events



110
111
112
113
114
115
116
117
118
# File 'lib/mcollective/util/natswrapper.rb', line 110

def log_nats_pool
  return unless has_client?

  servers = @client.server_pool.map do |server|
    server[:uri].to_s
  end

  Log.info("Current server pool: %s" % servers.join(", "))
end

#publish(destination, payload, reply = nil) ⇒ Object

Public a message

Parameters:

  • destination (String)

    the NATS destination

  • payload (String)

    the string to publish

  • reply (String) (defaults to: nil)

    a reply destination



194
195
196
197
198
199
200
201
202
203
204
# File 'lib/mcollective/util/natswrapper.rb', line 194

def publish(destination, payload, reply=nil)
  server_state = "%s %s" % [connected? ? "connected" : "disconnected", @client.connected_server]

  if reply
    Log.debug("Publishing to %s reply to %s via %s" % [destination, reply, server_state])
  else
    Log.debug("Publishing to %s via %s" % [destination, server_state])
  end

  @client.publish(destination, payload, reply)
end

#receiveString

Receives a message from the receive queue

This will block until a message is available

Returns:

  • (String)

    received message



185
186
187
# File 'lib/mcollective/util/natswrapper.rb', line 185

def receive
  @received_queue.pop
end

#start(options = {}) ⇒ Object

Starts the EM based NATS connection

Parameters:

  • options (Hash) (defaults to: {})

    Options as per MCollective::Util::NatsWrapper#NATS#NATS::IO#NATS::IO::Client#NATS::IO::Client#connect



123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
# File 'lib/mcollective/util/natswrapper.rb', line 123

def start(options={})
  # Client connects pretty much soon as it's initialized which is very early
  # and some applications like 'request_cert' just doesnt need/want a client
  # since for example there won't be SSL stuff yet, so if a application calls
  # disconnect very early on this should avoid that chicken and egg
  return if @force_Stop

  @client.on_reconnect do
    Log.warn("Reconnected after connection failure: %s" % connected_server)
    log_nats_pool
    @backoffcount = 1
  end

  @client.on_disconnect do |error|
    if error
      Log.warn("Disconnected from NATS: %s: %s" % [error.class, error.to_s])
    else
      Log.info("Disconnected from NATS for an unknown reason")
    end
  end

  @client.on_error do |error|
    Log.error("Error in NATS connection: %s: %s" % [error.class, error.to_s])
  end

  @client.on_close do
    Log.info("Connection to NATS server closed")
  end

  begin
    @client.connect(options)
  rescue ClientTimeoutError
    raise
  rescue
    Log.error("Error during initial NATS setup: %s: %s" % [$!.class, $!.message])
    Log.debug($!.backtrace.join("\n\t"))

    sleep 1

    Log.error("Retrying NATS initial setup")

    retry
  end

  sleep(0.01) until connected?

  @started = true

  nil
end

#started?Boolean

Has the NATS connection started

Returns:

  • (Boolean)


29
30
31
# File 'lib/mcollective/util/natswrapper.rb', line 29

def started?
  @started
end

#statsHash

Connection stats from the NATS gem

Returns:

  • (Hash)


52
53
54
55
56
# File 'lib/mcollective/util/natswrapper.rb', line 52

def stats
  return {} unless has_client?

  @client.stats
end

#stopObject

Stops the NATS connection



175
176
177
178
# File 'lib/mcollective/util/natswrapper.rb', line 175

def stop
  @force_stop = true
  @client.close
end

#stub_client(client) ⇒ Object

Test helper



237
238
239
# File 'lib/mcollective/util/natswrapper.rb', line 237

def stub_client(client)
  @client = client
end

#subscribe(source_name, options = {}) ⇒ Object

Subscribes to a message source

Parameters:

  • source_name (String)
  • options (Hash) (defaults to: {})

    options as accepted by NATS::IO::Client#subscribe



210
211
212
213
214
215
216
217
218
219
220
# File 'lib/mcollective/util/natswrapper.rb', line 210

def subscribe(source_name, options={})
  @subscription_mutex.synchronize do
    Log.debug("Subscribing to %s" % source_name)

    unless @subscriptions.include?(source_name)
      @subscriptions[source_name] = @client.subscribe(source_name, options) do |msg, _, _|
        @received_queue << msg
      end
    end
  end
end

#unsubscribe(source_name) ⇒ Object

Unsubscribes from a message source

Parameters:



225
226
227
228
229
230
231
232
# File 'lib/mcollective/util/natswrapper.rb', line 225

def unsubscribe(source_name)
  @subscription_mutex.synchronize do
    if @subscriptions.include?(source_name)
      @client.unsubscribe(@subscriptions[source_name])
      @subscriptions.delete(source_name)
    end
  end
end