Class: Fluent::Plugin::DynatraceOutput

Inherits:
Output
  • Object
show all
Defined in:
lib/fluent/plugin/out_dynatrace.rb

Overview

Fluentd output plugin for Dynatrace

Constant Summary collapse

HTTP_REQUEST_LOCK =
Mutex.new

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#agentObject

Default injection parameters. Requires the :inject helper to be added to the helpers above and the

inject lines to be uncommented in the #write and #process methods

config_section :inject do

config_set_default :time_type, :string
config_set_default :localtime, false

end



61
62
63
# File 'lib/fluent/plugin/out_dynatrace.rb', line 61

def agent
  @agent
end

#uriObject

Default injection parameters. Requires the :inject helper to be added to the helpers above and the

inject lines to be uncommented in the #write and #process methods

config_section :inject do

config_set_default :time_type, :string
config_set_default :localtime, false

end



61
62
63
# File 'lib/fluent/plugin/out_dynatrace.rb', line 61

def uri
  @uri
end

Instance Method Details

#configure(conf) ⇒ Object

Raises:

  • (Fluent::ConfigError)


63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
# File 'lib/fluent/plugin/out_dynatrace.rb', line 63

def configure(conf)
  compat_parameters_convert(conf, :inject)
  super

  raise Fluent::ConfigError, 'api_token is empty' if @api_token.empty?

  @uri = parse_and_validate_uri(@active_gate_url)

  @agent = Net::HTTP.new(@uri.host, @uri.port)

  return unless uri.scheme == 'https'

  @agent.use_ssl = true
  @agent.verify_mode = OpenSSL::SSL::VERIFY_NONE if @ssl_verify_none
end

#failure_message(res) ⇒ Object



170
171
172
173
174
175
176
177
178
# File 'lib/fluent/plugin/out_dynatrace.rb', line 170

def failure_message(res)
  res_summary = if res
                  "#{res.code} #{res.message} #{res.body}"
                else
                  'res=nil'
                end

  "failed to request #{uri} (#{res_summary})"
end

#multi_workers_ready?Boolean

Returns:

  • (Boolean)


118
119
120
# File 'lib/fluent/plugin/out_dynatrace.rb', line 118

def multi_workers_ready?
  false
end

#prefer_buffered_processingObject



114
115
116
# File 'lib/fluent/plugin/out_dynatrace.rb', line 114

def prefer_buffered_processing
  true
end

#prepare_requestObject



128
129
130
131
132
133
134
135
# File 'lib/fluent/plugin/out_dynatrace.rb', line 128

def prepare_request
  log.on_trace { log.trace('#prepare_request') }
  req = Net::HTTP::Post.new(uri, { 'User-Agent' => user_agent })
  req['Content-Type'] = 'application/json; charset=utf-8'
  req['Authorization'] = "Api-Token #{@api_token}"

  req
end

#process(_tag, es) ⇒ Object



86
87
88
89
90
91
92
93
94
95
96
97
# File 'lib/fluent/plugin/out_dynatrace.rb', line 86

def process(_tag, es)
  log.on_trace { log.trace('#process') }
  records = 0
  # es = inject_values_to_event_stream(tag, es)
  es.each do |time, record|
    records += 1
    log.on_trace { log.trace("#process Processing record #{records}") }
    record['@timestamp'] = time * 1000 if @inject_timestamp
    synchronized_send_records(record)
  end
  log.on_trace { log.trace("#process Processed #{records} records") }
end

#send_records(records) ⇒ Object



144
145
146
147
148
149
150
151
152
153
154
# File 'lib/fluent/plugin/out_dynatrace.rb', line 144

def send_records(records)
  log.on_trace { log.trace('#send_records') }

  agent.start unless agent.started?

  response = send_request(serialize(records))

  return if response.is_a?(Net::HTTPSuccess)

  raise failure_message response
end

#send_request(body) ⇒ Object



163
164
165
166
167
168
# File 'lib/fluent/plugin/out_dynatrace.rb', line 163

def send_request(body)
  log.on_trace { log.trace('#send_request') }
  response = @agent.request(prepare_request, body)
  log.on_trace { log.trace("#send_request response #{response}") }
  response
end

#serialize(records) ⇒ Object



156
157
158
159
160
161
# File 'lib/fluent/plugin/out_dynatrace.rb', line 156

def serialize(records)
  log.on_trace { log.trace('#serialize') }
  body = "#{records.to_json.chomp}\n"
  log.on_trace { log.trace("#serialize body length #{body.length}") }
  body
end

#shutdownObject



79
80
81
82
# File 'lib/fluent/plugin/out_dynatrace.rb', line 79

def shutdown
  @agent.finish if @agent.started?
  super
end

#synchronized_send_records(records) ⇒ Object



137
138
139
140
141
142
# File 'lib/fluent/plugin/out_dynatrace.rb', line 137

def synchronized_send_records(records)
  log.on_trace { log.trace('#synchronized_send_records') }
  HTTP_REQUEST_LOCK.synchronize do
    send_records(records)
  end
end

#user_agentObject



124
125
126
# File 'lib/fluent/plugin/out_dynatrace.rb', line 124

def user_agent
  "fluent-plugin-dynatrace v#{DynatraceOutputConstants.version}"
end

#write(chunk) ⇒ Object



99
100
101
102
103
104
105
106
107
108
109
110
# File 'lib/fluent/plugin/out_dynatrace.rb', line 99

def write(chunk)
  log.on_trace { log.trace('#write') }
  records = []
  chunk.each do |time, record|
    # records.push(inject_values_to_record(chunk.metadata.tag, time, record))
    record['@timestamp'] = time * 1000 if @inject_timestamp
    records.push(record)
  end

  log.on_trace { log.trace("#write sent #{records.length} records") }
  synchronized_send_records(records) unless records.empty?
end