Class: Fluent::AMQPInput::Handler
- Inherits:
-
Qpid::Proton::MessagingHandler
- Object
- Qpid::Proton::MessagingHandler
- Fluent::AMQPInput::Handler
- Defined in:
- lib/fluent/plugin/in_amq.rb
Instance Attribute Summary collapse
-
#engine ⇒ Object
readonly
Returns the value of attribute engine.
Instance Method Summary collapse
- #build_record(message) ⇒ Object
-
#initialize(log, url, tag, router, ssl_domain, queue) ⇒ Handler
constructor
A new instance of Handler.
- #on_connection_open(c) ⇒ Object
- #on_container_start(container) ⇒ Object
- #on_disconnect(event) ⇒ Object
- #on_message(delivery, message) ⇒ Object
Constructor Details
#initialize(log, url, tag, router, ssl_domain, queue) ⇒ Handler
Returns a new instance of Handler.
66 67 68 69 |
# File 'lib/fluent/plugin/in_amq.rb', line 66 def initialize(log, url, tag, router, ssl_domain, queue) super() @log, @url, @tag, @router, @ssl_domain, @queue = log, url, tag, router, ssl_domain, queue end |
Instance Attribute Details
#engine ⇒ Object (readonly)
Returns the value of attribute engine.
71 72 73 |
# File 'lib/fluent/plugin/in_amq.rb', line 71 def engine @engine end |
Instance Method Details
#build_record(message) ⇒ Object
90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 |
# File 'lib/fluent/plugin/in_amq.rb', line 90 def build_record() begin record = { body: JSON.parse(.body), address: .address, msg_id: .id } rescue => e @log.error "Error decoding JSON from #{e.}" @log.error "Backtrace: #{e.backtrace}" end record[:properties] = .properties if (.properties and .properties.size > 0) record[:annotations] = .annotations if (.annotations and .annotations.size > 0) record[:instructions] = .instructions if (.instructions and .instructions.size > 0) record[:subject] = .subject if (.subject and .subject.size > 0) record[:priority] = .priority if (.priority and .priority.size > 0) record[:user_id] = .user_id if (.user_id and .user_id.size > 0) record[:correlation_id] = .correlation_id if (.correlation_id and .correlation_id.size > 0) record[:creation_time] = .creation_time if (.creation_time and .creation_time > 0) record end |
#on_connection_open(c) ⇒ Object
85 86 87 88 |
# File 'lib/fluent/plugin/in_amq.rb', line 85 def on_connection_open(c) raise "No security!" unless c.transport.ssl? @log.debug "Connection secured with #{c.transport.ssl.protocol_name.inspect}" end |
#on_container_start(container) ⇒ Object
73 74 75 76 77 78 79 80 81 82 83 |
# File 'lib/fluent/plugin/in_amq.rb', line 73 def on_container_start(container) begin c = container.connect(@url, { :ssl_domain => @ssl_domain }) @log.debug "Opening connection to the address: #{@queue}" c.open_receiver(@queue) rescue => e @log.error "Error connecting to the message queue. #{e.}" @log.error "Backtrace: #{e.backtrace}" end end |
#on_disconnect(event) ⇒ Object
121 122 123 |
# File 'lib/fluent/plugin/in_amq.rb', line 121 def on_disconnect event puts "FIMXE #{NAME}: disconnected, re-connecting" end |
#on_message(delivery, message) ⇒ Object
113 114 115 116 117 118 119 |
# File 'lib/fluent/plugin/in_amq.rb', line 113 def (delivery, ) record = build_record() time = Engine.now tag = (.address and .address.size > 0) ? "#{@tag}.#{.address.sub("topic://","")}" : @tag @router.emit(tag, time, record) end |