Class: MCollective::Connector::Nats

Inherits:
Base
  • Object
show all
Defined in:
lib/mcollective/connector/nats.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods inherited from Base

inherited

Constructor Details

#initializeNats

rubocop:disable Lint/MissingSuper



10
11
12
13
14
15
16
# File 'lib/mcollective/connector/nats.rb', line 10

def initialize # rubocop:disable Lint/MissingSuper
  @config = Config.instance
  @subscriptions = []
  @connection = Util::NatsWrapper.new

  Log.info("Choria NATS.io connector using pure ruby nats/io/client %s with protocol version %s" % [NATS::IO::VERSION, NATS::IO::PROTOCOL])
end

Instance Attribute Details

#connectionObject (readonly)

Returns the value of attribute connection.



8
9
10
# File 'lib/mcollective/connector/nats.rb', line 8

def connection
  @connection
end

Instance Method Details

#active_optionsHash

Connection options from the NATS gem

Returns:

  • (Hash)


56
57
58
# File 'lib/mcollective/connector/nats.rb', line 56

def active_options
  connection.active_options
end

#choriaObject



445
446
447
# File 'lib/mcollective/connector/nats.rb', line 445

def choria
  @_choria ||= Util::Choria.new(false)
end

#client_flavourString

Client library flavour

Returns:



49
50
51
# File 'lib/mcollective/connector/nats.rb', line 49

def client_flavour
  connection.client_flavour
end

#client_versionString

Client library version

Returns:



42
43
44
# File 'lib/mcollective/connector/nats.rb', line 42

def client_version
  connection.client_version
end

#configure_ngs(parameters) ⇒ Object



104
105
106
107
108
109
110
111
112
113
# File 'lib/mcollective/connector/nats.rb', line 104

def configure_ngs(parameters)
  Log.debug("Disabling specific TLS during connection to NGS")

  raise("nkeys rubygem is required for connections with credentials") unless choria.nkeys?

  tls = OpenSSL::SSL::SSLContext.new
  tls.ssl_version = :TLSv1_2 # rubocop:disable Naming/VariableNumber

  parameters[:tls] = {:context => tls}
end

#configure_tls(parameters) ⇒ Object



99
100
101
102
# File 'lib/mcollective/connector/nats.rb', line 99

def configure_tls(parameters)
  parameters[:tls] = {:context => choria.ssl_context}
  choria.check_ssl_setup
end

#connectvoid

This method returns an undefined value.

Attempts to connect to the middleware, noop when already connected

Raises:

  • (StandardError)

    when SSL files are not readable



64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
# File 'lib/mcollective/connector/nats.rb', line 64

def connect
  if connection && connection.started?
    Log.debug("Already connection, not re-initializing connection")
    return
  end

  parameters = {
    :max_reconnect_attempts => -1,
    :reconnect_time_wait => 1,
    :dont_randomize_servers => !choria.randomize_middleware_servers?,
    :name => @config.identity
  }

  parameters[:user_credentials] = choria.credential_file if choria.credential_file?

  if $choria_unsafe_disable_nats_tls # rubocop:disable Style/GlobalVars
    Log.warn("Disabling TLS in NATS connector, this is not a production supported setup")
  elsif choria.ngs?
    configure_ngs(parameters)
  else
    configure_tls(parameters)
  end

  servers = server_list

  unless servers.empty?
    Log.debug("Connecting to servers: %s" % servers.join(", "))
    parameters[:servers] = servers
  end

  connection.start(parameters)

  nil
end

#connected?Boolean

Determines if the NATS connection is active

Returns:

  • (Boolean)


21
22
23
# File 'lib/mcollective/connector/nats.rb', line 21

def connected?
  connection.connected?
end

#connected_serverString?

Current connected server

Returns:



28
29
30
# File 'lib/mcollective/connector/nats.rb', line 28

def connected_server
  connection.connected_server
end

#current_pidFixnum

Note:

mainly used for testing

Retrieves the current process pid

Returns:

  • (Fixnum)


192
193
194
# File 'lib/mcollective/connector/nats.rb', line 192

def current_pid
  $$
end

#decorate_servers_with_users(servers) ⇒ Array<URI>

Add user and pass to a series of URIs

Parameters:

  • servers (Array<URI>)

    list of URI’s to decorate

Returns:



408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
# File 'lib/mcollective/connector/nats.rb', line 408

def decorate_servers_with_users(servers)
  user = get_option("nats.user", environment["MCOLLECTIVE_NATS_USERNAME"])
  pass = get_option("nats.pass", environment["MCOLLECTIVE_NATS_PASSWORD"])

  if choria.anon_tls?
    user = PluginManager["security_plugin"].request_signer.token
    pass = nil
  end

  if user || pass
    servers.each do |uri|
      uri.user = user
      uri.password = pass
    end
  end

  servers
end

#disconnectObject

Disconnects from NATS



116
117
118
# File 'lib/mcollective/connector/nats.rb', line 116

def disconnect
  connection.stop
end

#environmentObject

Retrieves the environment, mainly used for testing



428
429
430
# File 'lib/mcollective/connector/nats.rb', line 428

def environment
  ENV
end

#get_option(opt, default = :_unset) ⇒ Object

Gets a config option

Parameters:

  • opt (String)

    config option to look up

  • default (Object) (defaults to: :_unset)

    default to return when not found

Returns:

  • (Object)

    the found data or default

Raises:

  • (StandardError)

    when no default is given and option is not found



438
439
440
441
442
443
# File 'lib/mcollective/connector/nats.rb', line 438

def get_option(opt, default=:_unset)
  return @config.pluginconf[opt] if @config.pluginconf.include?(opt)
  return default unless default == :_unset

  raise("No plugin.%s configuration option given" % opt)
end

#headers_for(msg) ⇒ Hash

Creates the middleware headers needed for a given message

Parameters:

Returns:

  • (Hash)


124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
# File 'lib/mcollective/connector/nats.rb', line 124

def headers_for(msg)
  # mc_sender is only passed bacause M::Message incorrectly assumed this is some required
  # part of messages when its just some internals of the stomp based connectors that bled out
  headers = {
    "mc_sender" => @config.identity
  }

  headers["seen-by"] = [] if msg.headers.include?("seen-by")

  if [:request, :direct_request].include?(msg.type)
    if msg.reply_to
      headers["reply-to"] = msg.reply_to
    else
      # if its a request/direct_request style message and its not
      # one we're replying to - ie. its a new message we're making
      # we'll need to set a reply-to target that the daemon will
      # subscribe to
      headers["reply-to"] = make_target(msg.agent, :reply, msg.collective)
    end

    headers["seen-by"] << [@config.identity, connected_server.to_s] if msg.headers.include?("seen-by")
  elsif msg.type == :reply
    if msg.request.headers.include?("seen-by")
      headers["seen-by"] = msg.request.headers["seen-by"]
      headers["seen-by"].last << connected_server.to_s
    end
  end

  headers
end

#make_target(agent, type, collective, identity = nil) ⇒ String

Creates a target structure

Parameters:

  • agent (String)

    agent name

  • type (:directed, :broadcast, :reply, :request, :direct_request)
  • collective (String)

    target collective name

  • identity (String, nil) (defaults to: nil)

    identity for the request, else node configured identity

Returns:

Raises:

  • (StandardError)

    on invalid input



204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
# File 'lib/mcollective/connector/nats.rb', line 204

def make_target(agent, type, collective, identity=nil)
  raise("Unknown target type %s" % type) unless [:directed, :broadcast, :reply, :request, :direct_request].include?(type)

  raise("Unknown collective '%s' known collectives are '%s'" % [collective, @config.collectives.join(", ")]) unless @config.collectives.include?(collective)

  identity ||= @config.identity

  case type
  when :reply
    "%s.reply.%s.%d.%d" % [collective, Digest::MD5.hexdigest(choria.callerid), current_pid, Client.request_sequence]

  when :broadcast, :request
    "%s.broadcast.agent.%s" % [collective, agent]

  when :direct_request, :directed
    "%s.node.%s" % [collective, identity]
  end
end

#publish(msg) ⇒ Object

Publishes a message to the middleware

Parameters:



226
227
228
229
230
231
232
233
234
# File 'lib/mcollective/connector/nats.rb', line 226

def publish(msg)
  msg.base64_encode!

  if choria.federated?
    msg.type == :direct_request ? publish_federated_directed(msg) : publish_federated_broadcast(msg)
  else
    msg.type == :direct_request ? publish_connected_directed(msg) : publish_connected_broadcast(msg)
  end
end

#publish_connected_broadcast(msg) ⇒ Object

Publish a broadcast message to a connected collective

Parameters:



321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
# File 'lib/mcollective/connector/nats.rb', line 321

def publish_connected_broadcast(msg)
  target = target_for(msg)
  data = {
    "protocol" => "choria:transport:1",
    "data" => msg.payload,
    "headers" => target[:headers]
  }

  # only happens when replying
  received_message = msg.request
  data["headers"]["federation"] = received_message.headers["federation"] if received_message && received_message.headers.include?("federation")

  Log.debug("Sending a broadcast message to NATS target '%s' for message type %s" % [target.inspect, msg.type])

  connection.publish(target[:name], JSON.dump(data), target[:headers]["reply-to"])
end

#publish_connected_directed(msg) ⇒ Object

Publish a directed request to a connected collective

Parameters:



276
277
278
279
280
281
282
283
284
285
286
287
288
289
# File 'lib/mcollective/connector/nats.rb', line 276

def publish_connected_directed(msg)
  msg.discovered_hosts.each do |node|
    target = target_for(msg, node)
    data = {
      "protocol" => "choria:transport:1",
      "data" => msg.payload,
      "headers" => target[:headers]
    }

    Log.debug("Sending a direct message to %s via NATS target '%s' for message type %s" % [node, target.inspect, msg.type])

    connection.publish(target[:name], data.to_json, target[:headers]["reply-to"])
  end
end

#publish_federated_broadcast(msg) ⇒ Object

Publish a broadcast message to via a Federation Broker

Parameters:



294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
# File 'lib/mcollective/connector/nats.rb', line 294

def publish_federated_broadcast(msg)
  target = target_for(msg)
  data = {
    "protocol" => "choria:transport:1",
    "data" => msg.payload,
    "headers" => {
      "federation" => {
        "target" => [target[:name]],
        "req" => msg.requestid
      }
    }.merge(target[:headers])
  }

  data = JSON.dump(data)

  choria.federation_collectives.each do |network|
    target[:name] = "choria.federation.%s.federation" % network

    Log.debug("Sending a federated broadcast message to NATS target '%s' for message type %s" % [target.inspect, msg.type])

    connection.publish(target[:name], data, target[:headers]["reply-to"])
  end
end

#publish_federated_directed(msg) ⇒ Object

Publish a directed request via a Federation Broker

Parameters:



239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
# File 'lib/mcollective/connector/nats.rb', line 239

def publish_federated_directed(msg)
  messages = []
  target = target_for(msg, msg.discovered_hosts[0])

  msg.discovered_hosts.in_groups_of(200) do |nodes|
    node_targets = nodes.compact.map do |node|
      target_for(msg, node)[:name]
    end

    data = {
      "protocol" => "choria:transport:1",
      "data" => msg.payload,
      "headers" => {
        "federation" => {
          "target" => node_targets,
          "req" => msg.requestid
        }
      }.merge(target[:headers])
    }

    messages << JSON.dump(data)
  end

  choria.federation_collectives.each do |network|
    messages.each do |data|
      network_target = "choria.federation.%s.federation" % network

      Log.debug("Sending a federated direct message via NATS target '%s' for message type %s" % [network_target, msg.type])

      connection.publish(network_target, data, target[:headers]["reply-to"])
    end
  end
end

#receiveMessage

Note:

blocks until one is received

Receives a message from the middleware

Returns:



369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
# File 'lib/mcollective/connector/nats.rb', line 369

def receive
  msg = nil

  until msg
    received = connection.receive

    Thread.pass

    begin
      msg = JSON.parse(received)
    rescue
      Log.warn("Got non JSON data from the broker: %s" % [received])
      msg = nil
    end
  end

  msg["headers"]["seen-by"] << [connected_server.to_s, @config.identity] if msg["headers"].include?("seen-by")

  Message.new(msg["data"], msg, :base64 => true, :headers => msg["headers"])
end

#server_listArray<String>

Retrieves the list of server and port combos to attempt to connect to

Configured servers are checked, then SRV records and finally a fall back to puppet:4222 is done

Returns:



396
397
398
399
400
401
402
# File 'lib/mcollective/connector/nats.rb', line 396

def server_list
  uris = choria.middleware_servers("puppet", "4222").map do |host, port|
    URI("nats://%s:%s" % [host, port])
  end

  decorate_servers_with_users(uris).map(&:to_s)
end

#statsHash

Retrieves the NATS connection stats

Returns:

  • (Hash)


35
36
37
# File 'lib/mcollective/connector/nats.rb', line 35

def stats
  connection.stats
end

#subscribe(agent, type, collective) ⇒ void

This method returns an undefined value.

Subscribes to the topics/queues needed for a particular agent

Parameters:

  • agent (String)

    agent name

  • type (:reply, :broadcast, :request, :direct_request, :directed)

    type of message you want a subscription for

  • collective (String)

    the collective to subscribe for

See Also:



359
360
361
362
363
# File 'lib/mcollective/connector/nats.rb', line 359

def subscribe(agent, type, collective)
  target = make_target(agent, type, collective)

  connection.subscribe(target)
end

#target_for(msg, identity = nil) ⇒ Hash

Create a target structure for a message

Examples:

data


{
  :name => "nats.name",
  :headers => { headers... }
}

Parameters:

  • msg (Message)
  • identity (String, nil) (defaults to: nil)

    override identity

Returns:

  • (Hash)


167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
# File 'lib/mcollective/connector/nats.rb', line 167

def target_for(msg, identity=nil)
  target = nil

  if msg.type == :reply
    raise("Do not know how to reply, no reply-to header has been set on message %s" % msg.requestid) unless msg.request.headers["reply-to"]

    target = {:name => msg.request.headers["reply-to"], :headers => {}}

  elsif [:request, :direct_request].include?(msg.type)
    target = {:name => make_target(msg.agent, msg.type, msg.collective, identity), :headers => {}}

  else
    raise("Don't now how to create a target for message type %s" % msg.type)

  end

  target[:headers].merge!(headers_for(msg))

  target
end

#unsubscribe(agent, type, collective) ⇒ void

This method returns an undefined value.

Unsubscribe from the target for a agent

Parameters:

  • agent (String)

    agent name

  • type (:reply, :broadcast, :request, :direct_request, :directed)

    type of message you want a subscription for

  • collective (String)

    the collective to subscribe for

See Also:



345
346
347
348
349
350
# File 'lib/mcollective/connector/nats.rb', line 345

def unsubscribe(agent, type, collective)
  target = make_target(agent, type, collective)
  Log.debug("Unsubscribing from %s" % target)

  connection.unsubscribe(target)
end