Class: LogStash::Inputs::Jms

Inherits:
Threadable
  • Object
show all
Extended by:
PluginMixins::ValidatorSupport::FieldReferenceValidationAdapter
Includes:
PluginMixins::EventSupport::EventFactoryAdapter
Defined in:
lib/logstash/inputs/jms.rb

Overview

Read events from a Jms Broker. Supports both Jms Queues and Topics.

For more information about Jms, see <docs.oracle.com/javaee/6/tutorial/doc/bncdq.html> For more information about the Ruby Gem used, see <github.com/reidmorrison/jruby-jms> Here is a config example to pull from a queue:

jms {
   include_headers => false
   include_properties => false
   include_body => true
   use_jms_timestamp => false
   interval => 10
   destination => "myqueue"
   pub-sub => false
   yaml_file => "~/jms.yml"
   yaml_section => "mybroker"
 }

Defined Under Namespace

Classes: HeadersMapper, LegacyHeadersMapper

Constant Summary collapse

TARGET_NOT_SET_MESSAGE =
LogStash::PluginMixins::ECSCompatibilitySupport::TargetCheck::TARGET_NOT_SET_MESSAGE

Instance Method Summary collapse

Constructor Details

#initialize(*params) ⇒ Jms

Returns a new instance of Jms.



157
158
159
160
161
162
163
164
165
166
167
168
169
# File 'lib/logstash/inputs/jms.rb', line 157

def initialize(*params)
  super

  if ecs_compatibility != :disabled # set ECS target defaults
    @headers_target = '[@metadata][input][jms][headers]' unless original_params.include?('headers_target')
    @properties_target = '[@metadata][input][jms][properties]' unless original_params.include?('properties_target')
  end

  @headers_setter = event_setter_for(@headers_target)
  @properties_setter = event_setter_for(@properties_target)

  @headers_mapper = ecs_compatibility == :disabled ? LegacyHeadersMapper::INSTANCE : HeadersMapper::INSTANCE
end

Instance Method Details

#check_config!Object

Raises:

  • (LogStash::ConfigurationError)


247
248
249
250
251
252
253
254
255
256
257
258
# File 'lib/logstash/inputs/jms.rb', line 247

def check_config!
  if original_params.include?('include_header')
    if original_params.include?('include_headers')
      raise(LogStash::ConfigurationError, "Both `include_headers => #{include_headers}` and `include_header => #{include_header}`" +
                                          " options are specified, please only set one")
    end
    @include_headers = include_header # only `include_header => ...` was set
  end

  check_durable_subscription_config
  raise(LogStash::ConfigurationError, "Threads cannot be > 1 if pub_sub is set") if @threads > 1 && @pub_sub
end

#check_durable_subscription_configObject

Raises:

  • (LogStash::ConfigurationError)


260
261
262
263
264
265
# File 'lib/logstash/inputs/jms.rb', line 260

def check_durable_subscription_config
  return unless @durable_subscriber
  raise(LogStash::ConfigurationError, "pub_sub must be true if durable_subscriber is set") unless @pub_sub
  @durable_subscriber_client_id ||= 'Logstash'
  @durable_subscriber_name ||= destination
end

#correct_factory_hash(original, value) ⇒ Object



211
212
213
214
215
216
217
218
219
220
221
222
# File 'lib/logstash/inputs/jms.rb', line 211

def correct_factory_hash(original, value)
  if hash.is_a?(String)
    return true if value.downcase == "true"
    return false if value.downcase == "false"
  end

  if value.is_a?(Hash)
    value.each { |key, value| original[key.to_sym] = correct_factory_hash({}, value) }
    return original
  end
  value
end

#decode_message(msg) ⇒ LogStash::Event?

Parameters:

  • msg (JMS::TextMessage, JMS::BytesMessage)

Returns:

  • (LogStash::Event, nil)


390
391
392
393
394
395
# File 'lib/logstash/inputs/jms.rb', line 390

def decode_message(msg)
  text = msg.to_s # javax.jms.TextMessage#getText (e.g. JSON payload)
  event = nil
  @codec.decode(text) { |e| event = e } unless text.nil?
  event
end

#do_receive_message(message_consumer, timeout: 0) ⇒ Object

Loop through messages received and yield them.

NOTE: a simplified replacement for JMS::MessageConsumer#each and #get (extensions).

Parameters:

  • message_consumer (javax.jms.MessageConsumer)
  • timeout (defaults to: 0)

    in milliseconds



345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
# File 'lib/logstash/inputs/jms.rb', line 345

def do_receive_message(message_consumer, timeout: 0)
  # Receive messages according to timeout
  while true
    case timeout
    when  0 # Return immediately if no message is available
      message = message_consumer.receiveNoWait()
    when -1 # Wait forever
      message = message_consumer.receive()
    else # Wait for x milli-seconds for a message to be received from the broker
      message = message_consumer.receive(timeout)
    end
    break unless message
    yield message
  end
end

#do_target_checkObject



382
383
384
385
386
# File 'lib/logstash/inputs/jms.rb', line 382

def do_target_check
  return true unless target.nil?
  return nil if ecs_compatibility == :disabled
  logger.info(TARGET_NOT_SET_MESSAGE) # target isn't set in ECS mode
end

#do_target_check_once_and_get_event_factoryObject



373
374
375
376
377
378
# File 'lib/logstash/inputs/jms.rb', line 373

def do_target_check_once_and_get_event_factory
  @target_checked ||= begin
                        do_target_check
                        targeted_event_factory
                      end
end

#durable_subscriber(session, queue_or_topic, params) ⇒ Object



406
407
408
409
# File 'lib/logstash/inputs/jms.rb', line 406

def durable_subscriber(session, queue_or_topic, params)
  params[:selector]  ? session.create_durable_subscriber(queue_or_topic, @durable_subscriber_name, params[:selector], false) :
                       session.create_durable_subscriber(queue_or_topic, @durable_subscriber_name)
end

#error_hash(e) ⇒ Object



416
417
418
419
420
421
# File 'lib/logstash/inputs/jms.rb', line 416

def error_hash(e)
  error_hash = {:exception => e.class.name, :exception_message => e.message, :backtrace => e.backtrace}
  root_cause = get_root_cause(e)
  error_hash[:root_cause] = root_cause unless root_cause.nil?
  error_hash
end

#get_root_cause(e) ⇒ Object

JMS Exceptions can contain chains of Exceptions, making it difficult to determine the root cause of an error without knowing the actual root cause behind the problem. This method protects against Java Exceptions where the cause methods loop. If there is a cause loop, the last cause exception before the loop is detected will be returned, along with an entry in the root_cause hash indicating that an exception loop was detected. This will mean that the root cause may not be the actual root cause of the problem, and further investigation is required



429
430
431
432
433
434
435
436
437
438
439
440
441
442
# File 'lib/logstash/inputs/jms.rb', line 429

def get_root_cause(e)
  return nil unless e.respond_to?(:get_cause) && !e.get_cause.nil?
  cause = e
  slow_pointer = e
  # Use a slow pointer to avoid cause loops in Java Exceptions
  move_slow = false
  until (next_cause = cause.get_cause).nil?
    cause = next_cause
    return {:exception => cause.class.name, :exception_message => cause.message, :exception_loop => true } if cause == slow_pointer
    slow_pointer = slow_pointer.cause if move_slow
    move_slow = !move_slow
  end
  {:exception => cause.class.name, :exception_message => cause.message }
end

#jms_configObject



190
191
192
193
194
# File 'lib/logstash/inputs/jms.rb', line 190

def jms_config
  return jms_config_from_yaml(@yaml_file, @yaml_section) if @yaml_file
  return jms_config_from_jndi if @jndi_name
  jms_config_from_configuration
end

#jms_config_from_configurationObject



197
198
199
200
201
202
203
204
205
206
207
208
209
# File 'lib/logstash/inputs/jms.rb', line 197

def jms_config_from_configuration
  config = {
      :require_jars => @require_jars,
      :factory => @factory,
      :username => @username,
      :broker_url => @broker_url,
      :url => @broker_url # "broker_url" is named "url" with Oracle AQ
  }

  config[:password] = @password.value unless @password.nil?
  correct_factory_hash(config, @factory_settings) unless @factory_settings.nil?
  config
end

#jms_config_from_jndiObject



224
225
226
227
228
229
230
# File 'lib/logstash/inputs/jms.rb', line 224

def jms_config_from_jndi
  {
      :require_jars => @require_jars,
      :jndi_name => @jndi_name,
      :jndi_context => @jndi_context
  }
end

#jms_config_from_yaml(file, section) ⇒ Object



232
233
234
# File 'lib/logstash/inputs/jms.rb', line 232

def jms_config_from_yaml(file, section)
  YAML.load_file(file)[section]
end

#load_ssl_propertiesObject



236
237
238
239
240
241
# File 'lib/logstash/inputs/jms.rb', line 236

def load_ssl_properties
  java.lang.System.setProperty("javax.net.ssl.keyStore", @keystore) if @keystore
  java.lang.System.setProperty("javax.net.ssl.keyStorePassword", @keystore_password.value) if @keystore_password
  java.lang.System.setProperty("javax.net.ssl.trustStore", @truststore) if @truststore
  java.lang.System.setProperty("javax.net.ssl.trustStorePassword", @truststore_password.value) if @truststore_password
end

#load_system_propertiesObject



243
244
245
# File 'lib/logstash/inputs/jms.rb', line 243

def load_system_properties
  @system_properties.each { |k,v| java.lang.System.set_property(k,v.to_s) }
end

#obfuscatable?(setting) ⇒ Boolean

Returns:

  • (Boolean)


186
187
188
# File 'lib/logstash/inputs/jms.rb', line 186

def obfuscatable?(setting)
  [:password, :keystore_password, :truststore_password].include?(setting)
end

#obfuscate_jms_config(config) ⇒ Object

def register



182
183
184
# File 'lib/logstash/inputs/jms.rb', line 182

def obfuscate_jms_config(config)
  config.each_with_object({}) { |(k, v), h| h[k] = obfuscatable?(k) ? 'xxxxx' : v }
end

#process_map_message(msg) ⇒ LogStash::Event

Parameters:

  • msg (JMS::MapMessage)

Returns:

  • (LogStash::Event)


368
369
370
371
# File 'lib/logstash/inputs/jms.rb', line 368

def process_map_message(msg)
  data = to_string_keyed_hash(msg.data)
  do_target_check_once_and_get_event_factory.new_event(data)
end

#queue_event(msg, output_queue) ⇒ Object

def run_consumer



299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
# File 'lib/logstash/inputs/jms.rb', line 299

def queue_event(msg, output_queue)
  begin
    if @include_body
      if msg.java_kind_of?(JMS::MapMessage)
        event = process_map_message(msg)
      elsif msg.java_kind_of?(JMS::TextMessage) || msg.java_kind_of?(JMS::BytesMessage)
        event = decode_message(msg)
      else
        @logger.error( "Unsupported message type #{msg.data.class.to_s}" )
      end
    end

    event ||= event_factory.new_event

    # Here, we can use the JMS Enqueue timestamp as the @timestamp
    if @use_jms_timestamp && msg.jms_timestamp
      event.set("@timestamp", LogStash::Timestamp.at(msg.jms_timestamp / 1000, (msg.jms_timestamp % 1000) * 1000))
    end

    if @include_headers
      headers = map_headers(msg)
      @skip_headers.each { |key| headers.delete(key) }
      @headers_setter.call(event, headers)
    end

    if @include_properties
      properties = to_string_keyed_hash(msg.properties)
      @skip_properties.each { |key| properties.delete(key) }
      @properties_setter.call(event, properties)
    end

    decorate(event)
    output_queue << event

  rescue => e # parse or event creation error
    @logger.error("Failed to create event", :message => msg, :exception => e,
                  :backtrace => e.backtrace)
  end
end

#registerObject



171
172
173
174
175
176
177
178
179
180
# File 'lib/logstash/inputs/jms.rb', line 171

def register
  require "jms"

  check_config!
  load_ssl_properties
  load_system_properties if @system_properties
  @jms_config = jms_config

  @logger.debug("JMS Config being used ", :context => obfuscate_jms_config(@jms_config))
end

#regular_subscriber(session, queue_or_topic, params) ⇒ Object



411
412
413
414
# File 'lib/logstash/inputs/jms.rb', line 411

def regular_subscriber(session, queue_or_topic, params)
  params[:selector] ? session.create_consumer(queue_or_topic, params[:selector]) :
                      session.create_consumer(queue_or_topic)
end

#run(output_queue) ⇒ Object



267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
# File 'lib/logstash/inputs/jms.rb', line 267

def run(output_queue)
  begin
    connection = JMS::Connection.new(@jms_config)
    connection.client_id = @durable_subscriber_client_id if @durable_subscriber_client_id
    session = connection.create_session(@jms_config)
    connection.start
    params = {:timeout => @timeout * 1000, :selector => @selector}
    subscriber = subscriber(session, params)
    until stop?
      # read from the queue/topic until :timeout is reached, or a message is available
      # (whichever comes first)
      do_receive_message(subscriber, timeout: @interval * 1000) do |message|
        queue_event(message, output_queue)
        break if stop?
      end
    end
  rescue => e
    logger.warn("JMS Consumer Died", error_hash(e))
    unless stop?
      sleep(5)
      subscriber && subscriber.close
      session && session.close
      connection && connection.close
      retry
    end
  ensure
    subscriber && subscriber.close
    session && session.close
    connection && connection.close
  end
end

#subscriber(session, params) ⇒ Object



397
398
399
400
401
402
403
# File 'lib/logstash/inputs/jms.rb', line 397

def subscriber(session, params)
  destination_key = @pub_sub ? :topic_name : :queue_name
  params[destination_key] = @destination
  queue_or_topic = session.create_destination(params)
  @durable_subscriber ? durable_subscriber(session, queue_or_topic, params) :
                        regular_subscriber(session, queue_or_topic, params)
end