Class: LogStash::Inputs::Jms
- Inherits:
-
Threadable
- Object
- Threadable
- LogStash::Inputs::Jms
- Defined in:
- lib/logstash/inputs/jms.rb
Overview
Read events from a Jms Broker. Supports both Jms Queues and Topics.
For more information about Jms, see <docs.oracle.com/javaee/6/tutorial/doc/bncdq.html> For more information about the Ruby Gem used, see <github.com/reidmorrison/jruby-jms> Here is a config example to pull from a queue:
jms {
include_header => false
include_properties => false
include_body => true
=> false
interval => 10
destination => "myqueue"
pub-sub => false
yaml_file => "~/jms.yml"
yaml_section => "mybroker"
}
Instance Method Summary collapse
- #check_config ⇒ Object
- #check_durable_subscription_config ⇒ Object
- #correct_factory_hash(original, value) ⇒ Object
- #durable_subscriber(session, queue_or_topic, params) ⇒ Object
- #error_hash(e) ⇒ Object
-
#get_root_cause(e) ⇒ Object
JMS Exceptions can contain chains of Exceptions, making it difficult to determine the root cause of an error without knowing the actual root cause behind the problem.
- #jms_config ⇒ Object
- #jms_config_from_configuration ⇒ Object
- #jms_config_from_jndi ⇒ Object
- #jms_config_from_yaml(file, section) ⇒ Object
- #load_ssl_properties ⇒ Object
- #load_system_properties ⇒ Object
- #obfuscatable?(setting) ⇒ Boolean
-
#obfuscate_jms_config(config) ⇒ Object
def register.
- #queue_event(msg, output_queue) ⇒ Object
- #register ⇒ Object
- #regular_subscriber(session, queue_or_topic, params) ⇒ Object
- #run(output_queue) ⇒ Object
- #subscriber(session, params) ⇒ Object
Instance Method Details
#check_config ⇒ Object
211 212 213 214 |
# File 'lib/logstash/inputs/jms.rb', line 211 def check_config check_durable_subscription_config raise(LogStash::ConfigurationError, "Threads cannot be > 1 if pub_sub is set") if @threads > 1 && @pub_sub end |
#check_durable_subscription_config ⇒ Object
216 217 218 219 220 221 |
# File 'lib/logstash/inputs/jms.rb', line 216 def check_durable_subscription_config return unless @durable_subscriber raise(LogStash::ConfigurationError, "pub_sub must be true if durable_subscriber is set") unless @pub_sub @durable_subscriber_client_id ||= 'Logstash' @durable_subscriber_name ||= destination end |
#correct_factory_hash(original, value) ⇒ Object
175 176 177 178 179 180 181 182 183 184 185 186 |
# File 'lib/logstash/inputs/jms.rb', line 175 def correct_factory_hash(original, value) if hash.is_a?(String) return true if value.downcase == "true" return false if value.downcase == "false" end if value.is_a?(Hash) value.each { |key, value| original[key.to_sym] = correct_factory_hash({}, value) } return original end value end |
#durable_subscriber(session, queue_or_topic, params) ⇒ Object
313 314 315 316 |
# File 'lib/logstash/inputs/jms.rb', line 313 def durable_subscriber(session, queue_or_topic, params) params[:selector] ? session.create_durable_subscriber(queue_or_topic, @durable_subscriber_name, params[:selector], false) : session.create_durable_subscriber(queue_or_topic, @durable_subscriber_name) end |
#error_hash(e) ⇒ Object
323 324 325 326 327 328 |
# File 'lib/logstash/inputs/jms.rb', line 323 def error_hash(e) error_hash = {:exception => e.class.name, :exception_message => e., :backtrace => e.backtrace} root_cause = get_root_cause(e) error_hash[:root_cause] = root_cause unless root_cause.nil? error_hash end |
#get_root_cause(e) ⇒ Object
JMS Exceptions can contain chains of Exceptions, making it difficult to determine the root cause of an error without knowing the actual root cause behind the problem. This method protects against Java Exceptions where the cause methods loop. If there is a cause loop, the last cause exception before the loop is detected will be returned, along with an entry in the root_cause hash indicating that an exception loop was detected. This will mean that the root cause may not be the actual root cause of the problem, and further investigation is required
336 337 338 339 340 341 342 343 344 345 346 347 348 349 |
# File 'lib/logstash/inputs/jms.rb', line 336 def get_root_cause(e) return nil unless e.respond_to?(:get_cause) && !e.get_cause.nil? cause = e slow_pointer = e # Use a slow pointer to avoid cause loops in Java Exceptions move_slow = false until (next_cause = cause.get_cause).nil? cause = next_cause return {:exception => cause.class.name, :exception_message => cause., :exception_loop => true } if cause == slow_pointer slow_pointer = slow_pointer.cause if move_slow move_slow = !move_slow end {:exception => cause.class.name, :exception_message => cause. } end |
#jms_config ⇒ Object
154 155 156 157 158 |
# File 'lib/logstash/inputs/jms.rb', line 154 def jms_config return jms_config_from_yaml(@yaml_file, @yaml_section) if @yaml_file return jms_config_from_jndi if @jndi_name jms_config_from_configuration end |
#jms_config_from_configuration ⇒ Object
161 162 163 164 165 166 167 168 169 170 171 172 173 |
# File 'lib/logstash/inputs/jms.rb', line 161 def jms_config_from_configuration config = { :require_jars => @require_jars, :factory => @factory, :username => @username, :broker_url => @broker_url, :url => @broker_url # "broker_url" is named "url" with Oracle AQ } config[:password] = @password.value unless @password.nil? correct_factory_hash(config, @factory_settings) unless @factory_settings.nil? config end |
#jms_config_from_jndi ⇒ Object
188 189 190 191 192 193 194 |
# File 'lib/logstash/inputs/jms.rb', line 188 def jms_config_from_jndi { :require_jars => @require_jars, :jndi_name => @jndi_name, :jndi_context => @jndi_context } end |
#jms_config_from_yaml(file, section) ⇒ Object
196 197 198 |
# File 'lib/logstash/inputs/jms.rb', line 196 def jms_config_from_yaml(file, section) YAML.load_file(file)[section] end |
#load_ssl_properties ⇒ Object
200 201 202 203 204 205 |
# File 'lib/logstash/inputs/jms.rb', line 200 def load_ssl_properties java.lang.System.setProperty("javax.net.ssl.keyStore", @keystore) if @keystore java.lang.System.setProperty("javax.net.ssl.keyStorePassword", @keystore_password.value) if @keystore_password java.lang.System.setProperty("javax.net.ssl.trustStore", @truststore) if @truststore java.lang.System.setProperty("javax.net.ssl.trustStorePassword", @truststore_password.value) if @truststore_password end |
#load_system_properties ⇒ Object
207 208 209 |
# File 'lib/logstash/inputs/jms.rb', line 207 def load_system_properties @system_properties.each { |k,v| java.lang.System.set_property(k,v.to_s) } end |
#obfuscatable?(setting) ⇒ Boolean
150 151 152 |
# File 'lib/logstash/inputs/jms.rb', line 150 def obfuscatable?(setting) [:password, :keystore_password, :truststore_password].include?(setting) end |
#obfuscate_jms_config(config) ⇒ Object
def register
146 147 148 |
# File 'lib/logstash/inputs/jms.rb', line 146 def obfuscate_jms_config(config) config.each_with_object({}) { |(k, v), h| h[k] = obfuscatable?(k) ? 'xxxxx' : v } end |
#queue_event(msg, output_queue) ⇒ Object
256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 |
# File 'lib/logstash/inputs/jms.rb', line 256 def queue_event(msg, output_queue) begin if @include_body if msg.java_kind_of?(JMS::MapMessage) event = LogStash::Event.new msg.data.each do |field, value| event.set(field.to_s, value) # TODO(claveau): needs codec.decode or converter.convert ? end elsif msg.java_kind_of?(JMS::TextMessage) || msg.java_kind_of?(JMS::BytesMessage) unless msg.to_s.nil? @codec.decode(msg.to_s) do || event = end end else @logger.error( "Unsupported message type #{msg.data.class.to_s}" ) end end event ||= LogStash::Event.new # Here, we can use the JMS Enqueue timestamp as the @timestamp if && msg. event.set("@timestamp", LogStash::Timestamp.at(msg. / 1000, (msg. % 1000) * 1000)) end if @include_header msg.attributes && msg.attributes.each do |field, value| event.set(field.to_s, value) unless @skip_headers.include?(field.to_s) end end if @include_properties msg.properties && msg.properties.each do |field, value| event.set(field.to_s, value) unless @skip_properties.include?(field.to_s) end end decorate(event) output_queue << event rescue => e # parse or event creation error @logger.error("Failed to create event", :message => msg, :exception => e, :backtrace => e.backtrace) end end |
#register ⇒ Object
135 136 137 138 139 140 141 142 143 144 |
# File 'lib/logstash/inputs/jms.rb', line 135 def register require "jms" check_config load_ssl_properties load_system_properties if @system_properties @jms_config = jms_config @logger.debug("JMS Config being used ", :context => obfuscate_jms_config(@jms_config)) end |
#regular_subscriber(session, queue_or_topic, params) ⇒ Object
318 319 320 321 |
# File 'lib/logstash/inputs/jms.rb', line 318 def regular_subscriber(session, queue_or_topic, params) params[:selector] ? session.create_consumer(queue_or_topic, params[:selector]) : session.create_consumer(queue_or_topic) end |
#run(output_queue) ⇒ Object
223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 |
# File 'lib/logstash/inputs/jms.rb', line 223 def run(output_queue) begin connection = JMS::Connection.new(@jms_config) connection.client_id = @durable_subscriber_client_id if @durable_subscriber_client_id session = connection.create_session(@jms_config) connection.start params = {:timeout => @timeout * 1000, :selector => @selector} subscriber = subscriber(session, params) until stop? # This will read from the queue/topic until :timeout is breached, or messages are available whichever comes # first. subscriber.each({:timeout => @interval * 1000}) do || queue_event(, output_queue) break if stop? end end rescue => e logger.warn("JMS Consumer Died", error_hash(e)) unless stop? sleep(5) subscriber && subscriber.close session && session.close connection && connection.close retry end ensure subscriber && subscriber.close session && session.close connection && connection.close end end |
#subscriber(session, params) ⇒ Object
304 305 306 307 308 309 310 |
# File 'lib/logstash/inputs/jms.rb', line 304 def subscriber(session, params) destination_key = @pub_sub ? :topic_name : :queue_name params[destination_key] = @destination queue_or_topic = session.create_destination(params) @durable_subscriber ? durable_subscriber(session, queue_or_topic, params) : regular_subscriber(session, queue_or_topic, params) end |