Class: Fluent::Plugin::DynatraceOutput
- Inherits:
-
Output
- Object
- Output
- Fluent::Plugin::DynatraceOutput
- Defined in:
- lib/fluent/plugin/out_dynatrace.rb
Overview
Fluentd output plugin for Dynatrace
Constant Summary collapse
- HTTP_REQUEST_LOCK =
Mutex.new
Instance Attribute Summary collapse
-
#agent ⇒ Object
Default injection parameters.
-
#uri ⇒ Object
Default injection parameters.
Instance Method Summary collapse
- #configure(conf) ⇒ Object
- #failure_message(res) ⇒ Object
- #multi_workers_ready? ⇒ Boolean
- #prefer_buffered_processing ⇒ Object
- #prepare_request ⇒ Object
- #process(_tag, es) ⇒ Object
- #send_records(records) ⇒ Object
- #send_request(body) ⇒ Object
- #serialize(records) ⇒ Object
- #shutdown ⇒ Object
- #synchronized_send_records(records) ⇒ Object
- #user_agent ⇒ Object
- #write(chunk) ⇒ Object
Instance Attribute Details
#agent ⇒ Object
Default injection parameters. Requires the :inject helper to be added to the helpers above and the
inject lines to be uncommented in the #write and #process methods
config_section :inject do
config_set_default :time_type, :string
config_set_default :localtime, false
end
61 62 63 |
# File 'lib/fluent/plugin/out_dynatrace.rb', line 61 def agent @agent end |
#uri ⇒ Object
Default injection parameters. Requires the :inject helper to be added to the helpers above and the
inject lines to be uncommented in the #write and #process methods
config_section :inject do
config_set_default :time_type, :string
config_set_default :localtime, false
end
61 62 63 |
# File 'lib/fluent/plugin/out_dynatrace.rb', line 61 def uri @uri end |
Instance Method Details
#configure(conf) ⇒ Object
63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 |
# File 'lib/fluent/plugin/out_dynatrace.rb', line 63 def configure(conf) compat_parameters_convert(conf, :inject) super raise Fluent::ConfigError, 'api_token is empty' if @api_token.empty? @uri = parse_and_validate_uri(@active_gate_url) @agent = Net::HTTP.new(@uri.host, @uri.port) return unless uri.scheme == 'https' @agent.use_ssl = true @agent.verify_mode = OpenSSL::SSL::VERIFY_NONE if @ssl_verify_none end |
#failure_message(res) ⇒ Object
170 171 172 173 174 175 176 177 178 |
# File 'lib/fluent/plugin/out_dynatrace.rb', line 170 def (res) res_summary = if res "#{res.code} #{res.message} #{res.body}" else 'res=nil' end "failed to request #{uri} (#{res_summary})" end |
#multi_workers_ready? ⇒ Boolean
118 119 120 |
# File 'lib/fluent/plugin/out_dynatrace.rb', line 118 def multi_workers_ready? false end |
#prefer_buffered_processing ⇒ Object
114 115 116 |
# File 'lib/fluent/plugin/out_dynatrace.rb', line 114 def prefer_buffered_processing true end |
#prepare_request ⇒ Object
128 129 130 131 132 133 134 135 |
# File 'lib/fluent/plugin/out_dynatrace.rb', line 128 def prepare_request log.on_trace { log.trace('#prepare_request') } req = Net::HTTP::Post.new(uri, { 'User-Agent' => user_agent }) req['Content-Type'] = 'application/json; charset=utf-8' req['Authorization'] = "Api-Token #{@api_token}" req end |
#process(_tag, es) ⇒ Object
86 87 88 89 90 91 92 93 94 95 96 97 |
# File 'lib/fluent/plugin/out_dynatrace.rb', line 86 def process(_tag, es) log.on_trace { log.trace('#process') } records = 0 # es = inject_values_to_event_stream(tag, es) es.each do |time, record| records += 1 log.on_trace { log.trace("#process Processing record #{records}") } record['@timestamp'] = time * 1000 if synchronized_send_records(record) end log.on_trace { log.trace("#process Processed #{records} records") } end |
#send_records(records) ⇒ Object
144 145 146 147 148 149 150 151 152 153 154 |
# File 'lib/fluent/plugin/out_dynatrace.rb', line 144 def send_records(records) log.on_trace { log.trace('#send_records') } agent.start unless agent.started? response = send_request(serialize(records)) return if response.is_a?(Net::HTTPSuccess) raise response end |
#send_request(body) ⇒ Object
163 164 165 166 167 168 |
# File 'lib/fluent/plugin/out_dynatrace.rb', line 163 def send_request(body) log.on_trace { log.trace('#send_request') } response = @agent.request(prepare_request, body) log.on_trace { log.trace("#send_request response #{response}") } response end |
#serialize(records) ⇒ Object
156 157 158 159 160 161 |
# File 'lib/fluent/plugin/out_dynatrace.rb', line 156 def serialize(records) log.on_trace { log.trace('#serialize') } body = "#{records.to_json.chomp}\n" log.on_trace { log.trace("#serialize body length #{body.length}") } body end |
#shutdown ⇒ Object
79 80 81 82 |
# File 'lib/fluent/plugin/out_dynatrace.rb', line 79 def shutdown @agent.finish if @agent.started? super end |
#synchronized_send_records(records) ⇒ Object
137 138 139 140 141 142 |
# File 'lib/fluent/plugin/out_dynatrace.rb', line 137 def synchronized_send_records(records) log.on_trace { log.trace('#synchronized_send_records') } HTTP_REQUEST_LOCK.synchronize do send_records(records) end end |
#user_agent ⇒ Object
124 125 126 |
# File 'lib/fluent/plugin/out_dynatrace.rb', line 124 def user_agent "fluent-plugin-dynatrace v#{DynatraceOutputConstants.version}" end |
#write(chunk) ⇒ Object
99 100 101 102 103 104 105 106 107 108 109 110 |
# File 'lib/fluent/plugin/out_dynatrace.rb', line 99 def write(chunk) log.on_trace { log.trace('#write') } records = [] chunk.each do |time, record| # records.push(inject_values_to_record(chunk.metadata.tag, time, record)) record['@timestamp'] = time * 1000 if records.push(record) end log.on_trace { log.trace("#write sent #{records.length} records") } synchronized_send_records(records) unless records.empty? end |