Class: MCollective::Connector::Activemq

Inherits:
Base
  • Object
show all
Defined in:
lib/mcollective/connector/activemq.rb

Overview

Handles sending and receiving messages over the Stomp protocol for ActiveMQ servers specifically, we take advantages of ActiveMQ specific features and enhancements to the Stomp protocol. For best results in a clustered environment use ActiveMQ 5.5.0 at least.

This plugin takes an entirely different approach to dealing with ActiveMQ from the more generic stomp connector.

- Agents use /topic/<collective>.<agent>.agent
- Replies use temp-topics so they are private and transient.
- Point to Point messages using topics are supported by subscribing to
  /queue/<collective>.nodes with a selector "mc_identity = 'identity'

The use of temp-topics for the replies is a huge improvement over the old style. In the old way all clients got replies for all clients that were active at that time, this would mean that they would need to decrypt, validate etc in order to determine if they need to ignore the message, this was computationally expensive and on large busy networks the messages were being sent all over the show cross broker boundaries.

The new way means the messages go point2point back to only whoever requested the message, they only get their own replies and this is ap private channel that casual observers cannot just snoop into.

This plugin supports 1.1.6 and newer of the Stomp rubygem.

connector = activemq
plugin.activemq.pool.size = 2

plugin.activemq.pool.1.host = stomp1.your.net
plugin.activemq.pool.1.port = 61613
plugin.activemq.pool.1.user = you
plugin.activemq.pool.1.password = secret
plugin.activemq.pool.1.ssl = true
plugin.activemq.pool.1.ssl.cert = /path/to/your.cert
plugin.activemq.pool.1.ssl.key = /path/to/your.key
plugin.activemq.pool.1.ssl.ca = /path/to/your.ca
plugin.activemq.pool.1.ssl.fallback = true
plugin.activemq.pool.1.ssl.ciphers = TLSv1:!MD5:!LOW:!EXPORT

plugin.activemq.pool.2.host = stomp2.your.net
plugin.activemq.pool.2.port = 61613
plugin.activemq.pool.2.user = you
plugin.activemq.pool.2.password = secret
plugin.activemq.pool.2.ssl = false

Using this method you can supply just STOMP_USER and STOMP_PASSWORD. The port will default to 61613 if not specified.

The ssl options are only usable in version of the Stomp gem newer than 1.2.2 where these will imply full SSL validation will be done and you’ll only be able to connect to a ActiveMQ server that has a cert signed by the same CA. If you only set ssl = true and do not supply the cert, key and ca properties or if you have an older gem it will fall back to unverified mode only if ssl.fallback is true

In addition you can set the following options for the rubygem:

plugin.activemq.initial_reconnect_delay = 0.01
plugin.activemq.max_reconnect_delay = 30.0
plugin.activemq.use_exponential_back_off = true
plugin.activemq.back_off_multiplier = 2
plugin.activemq.max_reconnect_attempts = 0
plugin.activemq.randomize = false
plugin.activemq.timeout = -1

You can set the initial connetion timeout - this is when your stomp server is simply unreachable - after which it would failover to the next in the pool:

plugin.activemq.connect_timeout = 30

ActiveMQ JMS message priorities can be set:

plugin.activemq.priority = 4

This plugin supports Stomp protocol 1.1 when combined with the stomp gem version 1.2.10 or newer. To enable network heartbeats which will help keep the connection alive over NAT connections and aggresive session tracking firewalls you can set:

plugin.activemq.heartbeat_interval = 30

which will cause a heartbeat to be sent on 30 second intervals and one to be expected from the broker every 30 seconds. The shortest supported period is 30 seconds, if you set it lower it will get forced to 30 seconds.

After 2 failures to receive a heartbeat the connection will be reset via the normal failover mechanism.

By default if heartbeat_interval is set it will request Stomp 1.1 but support fallback to 1.0, but you can enable strict Stomp 1.1 only operation

plugin.activemq.stomp_1_0_fallback = 0

Defined Under Namespace

Classes: DummyError, EventLogger

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods inherited from Base

inherited

Constructor Details

#initializeActivemq

Returns a new instance of Activemq.



200
201
202
203
204
205
206
207
208
209
210
211
212
# File 'lib/mcollective/connector/activemq.rb', line 200

def initialize
  @config = Config.instance
  @subscriptions = []
  @msgpriority = 0
  @base64 = false
  @use_exponential_back_off = get_bool_option("activemq.use_exponential_back_off", "true")
  @initial_reconnect_delay = Float(get_option("activemq.initial_reconnect_delay", 0.01))
  @back_off_multiplier = Integer(get_option("activemq.back_off_multiplier", 2))
  @max_reconnect_delay = Float(get_option("activemq.max_reconnect_delay", 30.0))
  @reconnect_delay = @initial_reconnect_delay

  Log.info("ActiveMQ connector initialized.  Using stomp-gem #{stomp_version}")
end

Instance Attribute Details

#connectionObject (readonly)

Returns the value of attribute connection.



97
98
99
# File 'lib/mcollective/connector/activemq.rb', line 97

def connection
  @connection
end

Instance Method Details

#connect(connector = ::Stomp::Connection) ⇒ Object

Connects to the ActiveMQ middleware



215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
# File 'lib/mcollective/connector/activemq.rb', line 215

def connect(connector = ::Stomp::Connection)
  if @connection
    Log.debug("Already connection, not re-initializing connection")
    return
  end

  begin
    @base64 = get_bool_option("activemq.base64", "false")
    @msgpriority = get_option("activemq.priority", 0).to_i

    pools = Integer(get_option("activemq.pool.size"))
    hosts = []

    1.upto(pools) do |poolnum|
      host = {}

      host[:host] = get_option("activemq.pool.#{poolnum}.host")
      host[:port] = get_option("activemq.pool.#{poolnum}.port", 61613).to_i
      host[:login] = get_env_or_option("STOMP_USER", "activemq.pool.#{poolnum}.user", '')
      host[:passcode] = get_env_or_option("STOMP_PASSWORD", "activemq.pool.#{poolnum}.password", '')
      host[:ssl] = get_bool_option("activemq.pool.#{poolnum}.ssl", "false")

      # if ssl is enabled set :ssl to the hash of parameters
      if host[:ssl]
        host[:ssl] = ssl_parameters(poolnum, get_bool_option("activemq.pool.#{poolnum}.ssl.fallback", "false"))
      end

      Log.debug("Adding #{host[:host]}:#{host[:port]} to the connection pool")
      hosts << host
    end

    raise "No hosts found for the ActiveMQ connection pool" if hosts.size == 0

    connection = {:hosts => hosts}

    # Various STOMP gem options, defaults here matches defaults for 1.1.6 the meaning of
    # these can be guessed, the documentation isn't clear
    connection[:use_exponential_back_off] = @use_exponential_back_off
    connection[:initial_reconnect_delay] = @initial_reconnect_delay
    connection[:back_off_multiplier] = @back_off_multiplier
    connection[:max_reconnect_delay] = @max_reconnect_delay
    connection[:max_reconnect_attempts] = Integer(get_option("activemq.max_reconnect_attempts", 0))
    connection[:randomize] = get_bool_option("activemq.randomize", "false")
    connection[:backup] = get_bool_option("activemq.backup", "false")
    connection[:timeout] = Integer(get_option("activemq.timeout", -1))
    connection[:connect_timeout] = Integer(get_option("activemq.connect_timeout", 30))
    connection[:reliable] = true
    connection[:connect_headers] = connection_headers
    connection[:max_hbrlck_fails] = Integer(get_option("activemq.max_hbrlck_fails", 0))
    connection[:max_hbread_fails] = Integer(get_option("activemq.max_hbread_fails", 2))

    connection[:logger] = EventLogger.new

    @connection = connector.new(connection)

  rescue ClientTimeoutError => e
    raise e
  rescue Exception => e
    raise("Could not connect to ActiveMQ Server: #{e}")
  end
end

#connection_headersObject



285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
# File 'lib/mcollective/connector/activemq.rb', line 285

def connection_headers
  headers = {:"accept-version" => "1.0"}

  heartbeat_interval = Integer(get_option("activemq.heartbeat_interval", 0))
  stomp_1_0_fallback = get_bool_option("activemq.stomp_1_0_fallback", true)

  headers[:host] = get_option("activemq.vhost", "mcollective")

  if heartbeat_interval > 0
    unless stomp_version_supports_heartbeat?
      raise("Setting STOMP 1.1 properties like heartbeat intervals require at least version 1.2.10 of the STOMP gem")
    end

    if heartbeat_interval < 30
      Log.warn("Connection heartbeat is set to %d, forcing to minimum value of 30s")
      heartbeat_interval = 30
    end

    heartbeat_interval = heartbeat_interval * 1000
    headers[:"heart-beat"] = "%d,%d" % [heartbeat_interval + 500, heartbeat_interval - 500]

    if stomp_1_0_fallback
      headers[:"accept-version"] = "1.1,1.0"
    else
      headers[:"accept-version"] = "1.1"
    end
  else
    if stomp_version_supports_heartbeat?
      Log.info("Connecting without STOMP 1.1 heartbeats, if you are using ActiveMQ 5.8 or newer consider setting plugin.activemq.heartbeat_interval")
    end
  end

  headers
end

#disconnectObject

Disconnects from the ActiveMQ connection



478
479
480
481
482
# File 'lib/mcollective/connector/activemq.rb', line 478

def disconnect
  Log.debug("Disconnecting from ActiveMQ")
  @connection.disconnect
  @connection = nil
end

#exponential_back_offObject

Calculate the exponential backoff needed



371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
# File 'lib/mcollective/connector/activemq.rb', line 371

def exponential_back_off
  if !@use_exponential_back_off
    return nil
  end

  backoff = @reconnect_delay

  # calculate next delay
  @reconnect_delay = @reconnect_delay * @back_off_multiplier

  # cap at max reconnect delay
  if @reconnect_delay > @max_reconnect_delay
    @reconnect_delay = @max_reconnect_delay
  end

  return backoff
end

#get_bool_option(val, default) ⇒ Object

looks up a boolean value in the config



565
566
567
# File 'lib/mcollective/connector/activemq.rb', line 565

def get_bool_option(val, default)
  Util.str_to_bool(@config.pluginconf.fetch(val, default))
end

#get_cert_file(poolnum) ⇒ Object

Returns the name of the certficate file used by ActiveMQ Will first check if an environment variable MCOLLECTIVE_ACTIVEMQ_POOLX_SSL_CERT exists, where X is the ActiveMQ pool number. If the environment variable doesn’t exist, it will try and load the value from the config.



366
367
368
# File 'lib/mcollective/connector/activemq.rb', line 366

def get_cert_file(poolnum)
  ENV["MCOLLECTIVE_ACTIVEMQ_POOL%s_SSL_CERT" % poolnum] || get_option("activemq.pool.#{poolnum}.ssl.cert", false)
end

#get_env_or_option(env, opt, default = nil) ⇒ Object

looks in the environment first then in the config file for a specific option, accepts an optional default.

raises an exception when it cant find a value anywhere



546
547
548
549
550
551
552
# File 'lib/mcollective/connector/activemq.rb', line 546

def get_env_or_option(env, opt, default=nil)
  return ENV[env] if ENV.include?(env)
  return @config.pluginconf[opt] if @config.pluginconf.include?(opt)
  return default if default

  raise("No #{env} environment or plugin.#{opt} configuration option given")
end

#get_key_file(poolnum) ⇒ Object

Returns the name of the private key file used by ActiveMQ Will first check if an environment variable MCOLLECTIVE_ACTIVEMQ_POOLX_SSL_KEY exists, where X is the ActiveMQ pool number. If the environment variable doesn’t exist, it will try and load the value from the config.



358
359
360
# File 'lib/mcollective/connector/activemq.rb', line 358

def get_key_file(poolnum)
  ENV["MCOLLECTIVE_ACTIVEMQ_POOL%s_SSL_KEY" % poolnum] || get_option("activemq.pool.#{poolnum}.ssl.key", false)
end

#get_option(opt, default = nil) ⇒ Object

looks for a config option, accepts an optional default

raises an exception when it cant find a value anywhere



557
558
559
560
561
562
# File 'lib/mcollective/connector/activemq.rb', line 557

def get_option(opt, default=nil)
  return @config.pluginconf[opt] if @config.pluginconf.include?(opt)
  return default unless default.nil?

  raise("No plugin.#{opt} configuration option given")
end

#headers_for(msg, identity = nil) ⇒ Object



484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
# File 'lib/mcollective/connector/activemq.rb', line 484

def headers_for(msg, identity=nil)
  headers = {}

  headers = {"priority" => @msgpriority} if @msgpriority > 0

  headers["timestamp"] = (Time.now.utc.to_i * 1000).to_s

  # set the expires header based on the TTL, we build a small additional
  # timeout of 10 seconds in here to allow for network latency etc
  headers["expires"] = ((Time.now.utc.to_i + msg.ttl + 10) * 1000).to_s

  if [:request, :direct_request].include?(msg.type)
    target = make_target(msg.agent, :reply, msg.collective)

    if msg.reply_to
      headers["reply-to"] = msg.reply_to
    else
      headers["reply-to"] = target[:name]
    end

    headers["mc_identity"] = identity if msg.type == :direct_request
  end

  headers["mc_sender"] = Config.instance.identity

  return headers
end

#make_target(agent, type, collective) ⇒ Object



512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
# File 'lib/mcollective/connector/activemq.rb', line 512

def make_target(agent, type, collective)
  raise("Unknown target type #{type}") unless [:directed, :broadcast, :reply, :request, :direct_request].include?(type)
  raise("Unknown collective '#{collective}' known collectives are '#{@config.collectives.join ', '}'") unless @config.collectives.include?(collective)

  target = {:name => nil, :headers => {}}

  case type
    when :reply
      target[:name] = ["/queue/" + collective, :reply, "#{Config.instance.identity}_#{$$}", Client.request_sequence].join(".")

    when :broadcast
      target[:name] = ["/topic/" + collective, agent, :agent].join(".")

    when :request
      target[:name] = ["/topic/" + collective, agent, :agent].join(".")

    when :direct_request
      target[:name] = ["/queue/" + collective, :nodes].join(".")

    when :directed
      target[:name] = ["/queue/" + collective, :nodes].join(".")
      target[:headers]["selector"] = "mc_identity = '#{@config.identity}'"
      target[:id] = "%s_directed_to_identity" % collective
  end

  target[:id] = target[:name] unless target[:id]

  target
end

#publish(msg) ⇒ Object

Sends a message to the ActiveMQ connection



421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
# File 'lib/mcollective/connector/activemq.rb', line 421

def publish(msg)
  msg.base64_encode! if @base64

  target = target_for(msg)

  if msg.type == :direct_request
    msg.discovered_hosts.each do |node|
      target[:headers] = headers_for(msg, node)

      Log.debug("Sending a direct message to ActiveMQ target '#{target[:name]}' with headers '#{target[:headers].inspect}'")

      @connection.publish(target[:name], msg.payload, target[:headers])
    end
  else
    target[:headers].merge!(headers_for(msg))

    Log.debug("Sending a broadcast message to ActiveMQ target '#{target[:name]}' with headers '#{target[:headers].inspect}'")

    @connection.publish(target[:name], msg.payload, target[:headers])
  end
end

#receiveObject

Receives a message from the ActiveMQ connection



390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
# File 'lib/mcollective/connector/activemq.rb', line 390

def receive
  Log.debug("Waiting for a message from ActiveMQ")

  # When the Stomp library > 1.2.0 is mid reconnecting due to its reliable connection
  # handling it sets the connection to closed.  If we happen to be receiving at just
  # that time we will get an exception warning about the closed connection so handling
  # that here with a sleep and a retry.
  begin
    msg = @connection.receive
  rescue ::Stomp::Error::NoCurrentConnection
    sleep 1
    retry
  end

  # In older stomp gems an attempt to receive after failed authentication can return nil
  if msg.nil?
    raise MessageNotReceived.new(exponential_back_off), "No message received from ActiveMQ."

  end

  # We expect all messages we get to be of STOMP frame type MESSAGE, raise on unexpected types
  if msg.command != 'MESSAGE'
    Log.debug("Unexpected '#{msg.command}' frame.  Headers: #{msg.headers.inspect} Body: #{msg.body.inspect}")
    raise UnexpectedMessageType.new(exponential_back_off),
      "Received frame of type '#{msg.command}' expected 'MESSAGE'"
  end

  Message.new(msg.body, msg, :base64 => @base64, :headers => msg.headers)
end

#ssl_parameters(poolnum, fallback) ⇒ Object

Sets the SSL paramaters for a specific connection



321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
# File 'lib/mcollective/connector/activemq.rb', line 321

def ssl_parameters(poolnum, fallback)
  params = {
    :cert_file => get_cert_file(poolnum),
    :key_file  => get_key_file(poolnum),
    :ts_files  => get_option("activemq.pool.#{poolnum}.ssl.ca", false),
    :ciphers   => get_option("activemq.pool.#{poolnum}.ssl.ciphers", false),
  }

  raise "cert, key and ca has to be supplied for verified SSL mode" unless params[:cert_file] && params[:key_file] && params[:ts_files]

  raise "Cannot find certificate file #{params[:cert_file]}" unless File.exist?(params[:cert_file])
  raise "Cannot find key file #{params[:key_file]}" unless File.exist?(params[:key_file])

  params[:ts_files].split(",").each do |ca|
    raise "Cannot find CA file #{ca}" unless File.exist?(ca)
  end

  begin
    ::Stomp::SSLParams.new(params)
  rescue NameError
    raise "Stomp gem >= 1.2.2 is needed"
  end

rescue Exception => e
  if fallback
    Log.warn("Failed to set full SSL verified mode, falling back to unverified: #{e.class}: #{e}")
    return true
  else
    Log.error("Failed to set full SSL verified mode: #{e.class}: #{e}")
    raise(e)
  end
end

#stomp_versionObject



277
278
279
# File 'lib/mcollective/connector/activemq.rb', line 277

def stomp_version
  ::Stomp::Version::STRING
end

#stomp_version_supports_heartbeat?Boolean

Returns:

  • (Boolean)


281
282
283
# File 'lib/mcollective/connector/activemq.rb', line 281

def stomp_version_supports_heartbeat?
  return Util.versioncmp(stomp_version, "1.2.10") >= 0
end

#subscribe(agent, type, collective) ⇒ Object

Subscribe to a topic or queue



444
445
446
447
448
449
450
451
452
453
454
# File 'lib/mcollective/connector/activemq.rb', line 444

def subscribe(agent, type, collective)
  source = make_target(agent, type, collective)

  unless @subscriptions.include?(source[:id])
    Log.debug("Subscribing to #{source[:name]} with headers #{source[:headers].inspect.chomp}")
    @connection.subscribe(source[:name], source[:headers], source[:id])
    @subscriptions << source[:id]
  end
rescue ::Stomp::Error::DuplicateSubscription
  Log.error("Received subscription request for #{source.inspect.chomp} but already had a matching subscription, ignoring")
end

#target_for(msg) ⇒ Object



465
466
467
468
469
470
471
472
473
474
475
# File 'lib/mcollective/connector/activemq.rb', line 465

def target_for(msg)
  if msg.type == :reply
    target = {:name => msg.request.headers["reply-to"], :headers => {}}
  elsif [:request, :direct_request].include?(msg.type)
    target = make_target(msg.agent, msg.type, msg.collective)
  else
    raise "Don't now how to create a target for message type #{msg.type}"
  end

  return target
end

#unsubscribe(agent, type, collective) ⇒ Object

UnSubscribe to a topic or queue



457
458
459
460
461
462
463
# File 'lib/mcollective/connector/activemq.rb', line 457

def unsubscribe(agent, type, collective)
  source = make_target(agent, type, collective)

  Log.debug("Unsubscribing from #{source[:name]}")
  @connection.unsubscribe(source[:name], source[:headers], source[:id])
  @subscriptions.delete(source[:id])
end