Class: Fluent::Plugin::NewrelicOutput
- Inherits:
-
Output
- Object
- Output
- Fluent::Plugin::NewrelicOutput
show all
- Defined in:
- lib/fluent/plugin/out_newrelic.rb
Defined Under Namespace
Classes: ConnectionFailure
Constant Summary
collapse
- DEFAULT_BUFFER_TYPE =
'memory'.freeze
- DEFAULT_TIMEKEY =
5
- DEFAULT_TIMEKEY_WAIT =
0
- MAX_PAYLOAD_SIZE =
1000000
Instance Method Summary
collapse
Instance Method Details
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
|
# File 'lib/fluent/plugin/out_newrelic.rb', line 55
def configure(conf)
super
@api_key ||= ENV["NEW_RELIC_API_KEY"]
@license_key ||= ENV["NEW_RELIC_LICENSE_KEY"]
if @api_key.nil? && @license_key.nil?
raise Fluent::ConfigError.new("'api_key' or 'license_key' parameter is required")
end
@end_point = URI.parse(@base_uri)
auth = {
@api_key.nil? ? 'X-License-Key' : 'X-Insert-Key' =>
@api_key.nil? ? @license_key : @api_key
}
@header = {
'X-Event-Source' => 'logs',
'Content-Encoding' => 'gzip'
}.merge(auth)
.freeze
end
|
#handle_response(response) ⇒ Object
117
118
119
120
121
|
# File 'lib/fluent/plugin/out_newrelic.rb', line 117
def handle_response(response)
if !(200 <= response.code.to_i && response.code.to_i < 300)
log.error("Response was " + response.code + " " + response.body)
end
end
|
#multi_workers_ready? ⇒ Boolean
This tells Fluentd that it can run this output plugin in multiple workers. Our plugin has no interactions with other processes
51
52
53
|
# File 'lib/fluent/plugin/out_newrelic.rb', line 51
def multi_workers_ready?
true
end
|
#package_record(record, timestamp) ⇒ Object
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
|
# File 'lib/fluent/plugin/out_newrelic.rb', line 77
def package_record(record, timestamp)
if defined? timestamp.nsec
timestamp = timestamp * 1000 + timestamp.nsec / 1_000_000
end
packaged = {
'timestamp' => timestamp,
'attributes' => record
}
if record.has_key?('message')
packaged['message'] = record['message']
packaged['attributes'].delete('message')
end
if record.has_key?('log')
packaged['message'] = record['log']
packaged['attributes'].delete('log')
end
packaged
end
|
#send_payload(payload) ⇒ Object
123
124
125
126
127
128
129
130
|
# File 'lib/fluent/plugin/out_newrelic.rb', line 123
def send_payload(payload)
http = Net::HTTP.new(@end_point.host, 443)
http.use_ssl = true
http.verify_mode = OpenSSL::SSL::VERIFY_PEER
request = Net::HTTP::Post.new(@end_point.request_uri, @header)
request.body = payload
handle_response(http.request(request))
end
|
#write(chunk) ⇒ Object
104
105
106
107
108
109
110
111
112
113
114
115
|
# File 'lib/fluent/plugin/out_newrelic.rb', line 104
def write(chunk)
logs = []
chunk.msgpack_each do |ts, record|
next unless record.is_a? Hash
next if record.empty?
logs.push(package_record(record, ts))
end
payloads = get_compressed_payloads(logs)
payloads.each { |payload| send_payload(payload) }
end
|