Class: Fluent::AMQPInput::Handler

Inherits:
Qpid::Proton::MessagingHandler
  • Object
show all
Defined in:
lib/fluent/plugin/in_amq.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(log, url, tag, router, ssl_domain, queue) ⇒ Handler

Returns a new instance of Handler.



66
67
68
69
# File 'lib/fluent/plugin/in_amq.rb', line 66

def initialize(log, url, tag, router, ssl_domain, queue)
  super()
  @log, @url, @tag, @router, @ssl_domain, @queue = log, url, tag, router, ssl_domain, queue
end

Instance Attribute Details

#engineObject (readonly)

Returns the value of attribute engine.



71
72
73
# File 'lib/fluent/plugin/in_amq.rb', line 71

def engine
  @engine
end

Instance Method Details

#build_record(message) ⇒ Object



90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
# File 'lib/fluent/plugin/in_amq.rb', line 90

def build_record(message)
  begin
    record = {
      body: JSON.parse(message.body),
      address: message.address,
      msg_id: message.id
    }
  rescue => e
    @log.error "Error decoding JSON from #{e.message}"
    @log.error "Backtrace: #{e.backtrace}"
  end

  record[:properties] = message.properties if (message.properties and message.properties.size > 0)
  record[:annotations] = message.annotations if (message.annotations and message.annotations.size > 0)
  record[:instructions] = message.instructions if (message.instructions and message.instructions.size > 0)
  record[:subject] = message.subject if (message.subject and message.subject.size > 0)
  record[:priority] = message.priority if (message.priority and message.priority.size > 0)
  record[:user_id] = message.user_id if (message.user_id and message.user_id.size > 0)
  record[:correlation_id] = message.correlation_id if (message.correlation_id and message.correlation_id.size > 0)
  record[:creation_time] = message.creation_time if (message.creation_time and message.creation_time > 0)
  record
end

#on_connection_open(c) ⇒ Object



85
86
87
88
# File 'lib/fluent/plugin/in_amq.rb', line 85

def on_connection_open(c)
  raise "No security!"  unless c.transport.ssl?
  @log.debug "Connection secured with #{c.transport.ssl.protocol_name.inspect}"
end

#on_container_start(container) ⇒ Object



73
74
75
76
77
78
79
80
81
82
83
# File 'lib/fluent/plugin/in_amq.rb', line 73

def on_container_start(container)
  begin
    c = container.connect(@url, { :ssl_domain => @ssl_domain })
    @log.debug "Opening connection to the address: #{@queue}"
    c.open_receiver(@queue)
  rescue => e
    @log.error "Error connecting to the message queue. #{e.message}"
    @log.error "Backtrace: #{e.backtrace}"
  end

end

#on_disconnect(event) ⇒ Object



121
122
123
# File 'lib/fluent/plugin/in_amq.rb', line 121

def on_disconnect event
  puts "FIMXE #{NAME}: disconnected, re-connecting"
end

#on_message(delivery, message) ⇒ Object



113
114
115
116
117
118
119
# File 'lib/fluent/plugin/in_amq.rb', line 113

def on_message(delivery, message)
  record = build_record(message)
  time = Engine.now

  tag = (message.address and message.address.size > 0) ? "#{@tag}.#{message.address.sub("topic://","")}" : @tag
  @router.emit(tag, time, record)
end