Class: LogStash::Inputs::Jms

Inherits:
Threadable
  • Object
show all
Defined in:
lib/logstash/inputs/jms.rb

Overview

Read events from a Jms Broker. Supports both Jms Queues and Topics.

For more information about Jms, see <docs.oracle.com/javaee/6/tutorial/doc/bncdq.html> For more information about the Ruby Gem used, see <github.com/reidmorrison/jruby-jms> Here is a config example to pull from a queue:

jms {
   include_header => false
   include_properties => false
   include_body => true
   use_jms_timestamp => false
   interval => 10
   destination => "myqueue"
   pub-sub => false
   yaml_file => "~/jms.yml"
   yaml_section => "mybroker"
 }

Instance Method Summary collapse

Instance Method Details

#check_configObject

Raises:

  • (LogStash::ConfigurationError)


211
212
213
214
# File 'lib/logstash/inputs/jms.rb', line 211

def check_config
  check_durable_subscription_config
  raise(LogStash::ConfigurationError, "Threads cannot be > 1 if pub_sub is set") if @threads > 1 && @pub_sub
end

#check_durable_subscription_configObject

Raises:

  • (LogStash::ConfigurationError)


216
217
218
219
220
221
# File 'lib/logstash/inputs/jms.rb', line 216

def check_durable_subscription_config
  return unless @durable_subscriber
  raise(LogStash::ConfigurationError, "pub_sub must be true if durable_subscriber is set") unless @pub_sub
  @durable_subscriber_client_id ||= 'Logstash'
  @durable_subscriber_name ||= destination
end

#correct_factory_hash(original, value) ⇒ Object



175
176
177
178
179
180
181
182
183
184
185
186
# File 'lib/logstash/inputs/jms.rb', line 175

def correct_factory_hash(original, value)
  if hash.is_a?(String)
    return true if value.downcase == "true"
    return false if value.downcase == "false"
  end

  if value.is_a?(Hash)
    value.each { |key, value| original[key.to_sym] = correct_factory_hash({}, value) }
    return original
  end
  value
end

#durable_subscriber(session, queue_or_topic, params) ⇒ Object



313
314
315
316
# File 'lib/logstash/inputs/jms.rb', line 313

def durable_subscriber(session, queue_or_topic, params)
  params[:selector]  ? session.create_durable_subscriber(queue_or_topic, @durable_subscriber_name, params[:selector], false) :
                       session.create_durable_subscriber(queue_or_topic, @durable_subscriber_name)
end

#error_hash(e) ⇒ Object



323
324
325
326
327
328
# File 'lib/logstash/inputs/jms.rb', line 323

def error_hash(e)
  error_hash = {:exception => e.class.name, :exception_message => e.message, :backtrace => e.backtrace}
  root_cause = get_root_cause(e)
  error_hash[:root_cause] = root_cause unless root_cause.nil?
  error_hash
end

#get_root_cause(e) ⇒ Object

JMS Exceptions can contain chains of Exceptions, making it difficult to determine the root cause of an error without knowing the actual root cause behind the problem. This method protects against Java Exceptions where the cause methods loop. If there is a cause loop, the last cause exception before the loop is detected will be returned, along with an entry in the root_cause hash indicating that an exception loop was detected. This will mean that the root cause may not be the actual root cause of the problem, and further investigation is required



336
337
338
339
340
341
342
343
344
345
346
347
348
349
# File 'lib/logstash/inputs/jms.rb', line 336

def get_root_cause(e)
  return nil unless e.respond_to?(:get_cause) && !e.get_cause.nil?
  cause = e
  slow_pointer = e
  # Use a slow pointer to avoid cause loops in Java Exceptions
  move_slow = false
  until (next_cause = cause.get_cause).nil?
    cause = next_cause
    return {:exception => cause.class.name, :exception_message => cause.message, :exception_loop => true } if cause == slow_pointer
    slow_pointer = slow_pointer.cause if move_slow
    move_slow = !move_slow
  end
  {:exception => cause.class.name, :exception_message => cause.message }
end

#jms_configObject



154
155
156
157
158
# File 'lib/logstash/inputs/jms.rb', line 154

def jms_config
  return jms_config_from_yaml(@yaml_file, @yaml_section) if @yaml_file
  return jms_config_from_jndi if @jndi_name
  jms_config_from_configuration
end

#jms_config_from_configurationObject



161
162
163
164
165
166
167
168
169
170
171
172
173
# File 'lib/logstash/inputs/jms.rb', line 161

def jms_config_from_configuration
  config = {
      :require_jars => @require_jars,
      :factory => @factory,
      :username => @username,
      :broker_url => @broker_url,
      :url => @broker_url # "broker_url" is named "url" with Oracle AQ
  }

  config[:password] = @password.value unless @password.nil?
  correct_factory_hash(config, @factory_settings) unless @factory_settings.nil?
  config
end

#jms_config_from_jndiObject



188
189
190
191
192
193
194
# File 'lib/logstash/inputs/jms.rb', line 188

def jms_config_from_jndi
  {
      :require_jars => @require_jars,
      :jndi_name => @jndi_name,
      :jndi_context => @jndi_context
  }
end

#jms_config_from_yaml(file, section) ⇒ Object



196
197
198
# File 'lib/logstash/inputs/jms.rb', line 196

def jms_config_from_yaml(file, section)
  YAML.load_file(file)[section]
end

#load_ssl_propertiesObject



200
201
202
203
204
205
# File 'lib/logstash/inputs/jms.rb', line 200

def load_ssl_properties
  java.lang.System.setProperty("javax.net.ssl.keyStore", @keystore) if @keystore
  java.lang.System.setProperty("javax.net.ssl.keyStorePassword", @keystore_password.value) if @keystore_password
  java.lang.System.setProperty("javax.net.ssl.trustStore", @truststore) if @truststore
  java.lang.System.setProperty("javax.net.ssl.trustStorePassword", @truststore_password.value) if @truststore_password
end

#load_system_propertiesObject



207
208
209
# File 'lib/logstash/inputs/jms.rb', line 207

def load_system_properties
  @system_properties.each { |k,v| java.lang.System.set_property(k,v.to_s) }
end

#obfuscatable?(setting) ⇒ Boolean

Returns:

  • (Boolean)


150
151
152
# File 'lib/logstash/inputs/jms.rb', line 150

def obfuscatable?(setting)
  [:password, :keystore_password, :truststore_password].include?(setting)
end

#obfuscate_jms_config(config) ⇒ Object

def register



146
147
148
# File 'lib/logstash/inputs/jms.rb', line 146

def obfuscate_jms_config(config)
  config.each_with_object({}) { |(k, v), h| h[k] = obfuscatable?(k) ? 'xxxxx' : v }
end

#queue_event(msg, output_queue) ⇒ Object



256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
# File 'lib/logstash/inputs/jms.rb', line 256

def queue_event(msg, output_queue)
  begin
    if @include_body
      if msg.java_kind_of?(JMS::MapMessage)
        event = LogStash::Event.new
        msg.data.each do |field, value|
          event.set(field.to_s, value) # TODO(claveau): needs codec.decode or converter.convert ?
        end
      elsif msg.java_kind_of?(JMS::TextMessage) || msg.java_kind_of?(JMS::BytesMessage)
        unless msg.to_s.nil?
          @codec.decode(msg.to_s) do |event_message|
            event = event_message
          end
        end
      else
        @logger.error( "Unsupported message type #{msg.data.class.to_s}" )
      end
    end

    event ||= LogStash::Event.new

    # Here, we can use the JMS Enqueue timestamp as the @timestamp
    if @use_jms_timestamp && msg.jms_timestamp
      event.set("@timestamp", LogStash::Timestamp.at(msg.jms_timestamp / 1000, (msg.jms_timestamp % 1000) * 1000))
    end

    if @include_header
      msg.attributes && msg.attributes.each do |field, value|
        event.set(field.to_s, value) unless @skip_headers.include?(field.to_s)
      end
    end

    if @include_properties
      msg.properties && msg.properties.each do |field, value|
        event.set(field.to_s, value) unless @skip_properties.include?(field.to_s)
      end
    end

    decorate(event)
    output_queue << event

  rescue => e # parse or event creation error
    @logger.error("Failed to create event", :message => msg, :exception => e,
                  :backtrace => e.backtrace)
  end
end

#registerObject



135
136
137
138
139
140
141
142
143
144
# File 'lib/logstash/inputs/jms.rb', line 135

def register
  require "jms"

  check_config
  load_ssl_properties
  load_system_properties if @system_properties
  @jms_config = jms_config

  @logger.debug("JMS Config being used ", :context => obfuscate_jms_config(@jms_config))
end

#regular_subscriber(session, queue_or_topic, params) ⇒ Object



318
319
320
321
# File 'lib/logstash/inputs/jms.rb', line 318

def regular_subscriber(session, queue_or_topic, params)
  params[:selector] ? session.create_consumer(queue_or_topic, params[:selector]) :
                      session.create_consumer(queue_or_topic)
end

#run(output_queue) ⇒ Object



223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
# File 'lib/logstash/inputs/jms.rb', line 223

def run(output_queue)
  begin
    connection = JMS::Connection.new(@jms_config)
    connection.client_id = @durable_subscriber_client_id if @durable_subscriber_client_id
    session = connection.create_session(@jms_config)
    connection.start
    params = {:timeout => @timeout * 1000, :selector => @selector}
    subscriber = subscriber(session, params)
    until stop?
      # This will read from the queue/topic until :timeout is breached, or messages are available whichever comes
      # first.
      subscriber.each({:timeout => @interval * 1000}) do |message|
        queue_event(message, output_queue)
        break if stop?
      end
    end
  rescue => e
    logger.warn("JMS Consumer Died", error_hash(e))
    unless stop?
      sleep(5)
      subscriber && subscriber.close
      session && session.close
      connection && connection.close
      retry
    end
  ensure
    subscriber && subscriber.close
    session && session.close
    connection && connection.close
  end
end

#subscriber(session, params) ⇒ Object



304
305
306
307
308
309
310
# File 'lib/logstash/inputs/jms.rb', line 304

def subscriber(session, params)
  destination_key = @pub_sub ? :topic_name : :queue_name
  params[destination_key] = @destination
  queue_or_topic = session.create_destination(params)
  @durable_subscriber ? durable_subscriber(session, queue_or_topic, params) :
                        regular_subscriber(session, queue_or_topic, params)
end