Class: Fluent::InsightOutput
- Inherits:
-
BufferedOutput
- Object
- BufferedOutput
- Fluent::InsightOutput
show all
- Defined in:
- lib/fluent/plugin/out_insight.rb
Defined Under Namespace
Classes: ConnectionFailure
Constant Summary
collapse
- INSIGHT_DATA_TEMPLATE =
"%{region}.data.logs.insight.rapid7.com"
- INSIGHT_REST_TEMPLATE =
"https://%{region}.rest.logs.insight.rapid7.com"
- INSIGHT_LOGSETS_TEMPLATE =
"/management/logsets/%{logset_id}"
- THREAD_COUNT =
4
Instance Method Summary
collapse
Instance Method Details
#client ⇒ Object
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
|
# File 'lib/fluent/plugin/out_insight.rb', line 97
def client
insight_data_host = INSIGHT_DATA_TEMPLATE % { :region => @region }
@_socket ||= if @use_ssl
context = OpenSSL::SSL::SSLContext.new
socket = TCPSocket.new insight_data_host, @port
ssl_client = OpenSSL::SSL::SSLSocket.new socket, context
ssl_client.connect
else
if @protocol == 'tcp'
TCPSocket.new insight_data_host, @port
else
udp_client = UDPSocket.new
udp_client.connect insight_data_host, @port
udp_client
end
end
end
|
31
32
33
|
# File 'lib/fluent/plugin/out_insight.rb', line 31
def configure(conf)
super
end
|
115
116
117
|
# File 'lib/fluent/plugin/out_insight.rb', line 115
def format(tag, time, record)
return [tag, time, record].to_msgpack
end
|
#insight_log_token(url) ⇒ Object
82
83
84
85
86
87
88
89
90
91
92
93
94
95
|
# File 'lib/fluent/plugin/out_insight.rb', line 82
def insight_log_token(url)
log_body = insight_rest_request(url)
if log_body.key?(@key)
log_info = log_body[@key]
if log_info.key?('tokens')
log.info "Found log #{log_info['name']}"
return log_info['name'], log_info['tokens'][0]
else
log.warn "Log is empty"
end
else
log.warn "Response doesn't contain log info"
end
end
|
#insight_rest_request(url) ⇒ Object
68
69
70
71
72
73
74
75
76
77
78
79
80
|
# File 'lib/fluent/plugin/out_insight.rb', line 68
def insight_rest_request(url)
uri = URI(url)
request = Net::HTTP::Get.new(uri)
request['content-type'] = 'application/json'
request['x-api-key'] = @api_key
response = Net::HTTP.start(uri.hostname, uri.port, :use_ssl => uri.scheme == 'https') {|http|
http.request(request)
}
if response.code == "200"
return JSON.parse(response.body)
end
log.error "Request was failed HTTP #{response.code}: \n#{response.body}"
end
|
#send_insight(token, data) ⇒ Object
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
|
# File 'lib/fluent/plugin/out_insight.rb', line 143
def send_insight(token, data)
retries = 0
begin
client.write("#{token} #{data} \n")
rescue Errno::EMSGSIZE
str_length = data.length
send_insight(token, data[0..str_length/2])
send_insight(token, data[(str_length/2) + 1..str_length])
log.warn "Message Too Long, re-sending it in two part..."
rescue => e
if retries < @max_retries
retries += 1
@_socket = nil
log.warn "Could not push logs to Insight, resetting connection and trying again. #{e.message}"
sleep 5**retries
retry
end
raise ConnectionFailure, "Could not push logs to Insight after #{retries} retries. #{e.message}"
end
end
|
#shutdown ⇒ Object
64
65
66
|
# File 'lib/fluent/plugin/out_insight.rb', line 64
def shutdown
super
end
|
#start ⇒ Object
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
|
# File 'lib/fluent/plugin/out_insight.rb', line 35
def start
logsets_url = (INSIGHT_REST_TEMPLATE + INSIGHT_LOGSETS_TEMPLATE) % { :region => @region, :logset_id => @logset_id }
@insight_tags = Hash[@tags.split(",").each_with_object(nil).to_a]
logset_body = insight_rest_request(logsets_url)
threads = []
@tokens = Hash.new
mutex = Mutex.new
if logset_body.instance_of?(Hash) and logset_body.key?('logset')
logset_info = logset_body['logset']
if logset_info.key?('logs_info')
logs_info = logset_info['logs_info']
log_urls = logs_info.map { |log| log['links'][0]['href'] }
THREAD_COUNT.times.map {
Thread.new(log_urls, @tokens) do |urls, tokens|
while url = mutex.synchronize { urls.pop }
log_name, token = insight_log_token(url)
mutex.synchronize { tokens[log_name] = token }
end
end
}.each(&:join)
else
log.warn "No logs info found in logset response"
end
else
log.warn "Logset emtity is empty"
end
super
end
|
#write(chunk) ⇒ Object
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
|
# File 'lib/fluent/plugin/out_insight.rb', line 119
def write(chunk)
return if @tokens.empty?
chunk.msgpack_each do |(tag, time, record)|
next unless record.is_a? Hash
message = (record.delete('message')&.to_s&.rstrip || '')
next if message.empty?
@insight_tags.each { |k,v|
@insight_tags[k] = record[k]
}
symbolized_tags = @insight_tags.inject({}){|memo,(k,v)| memo[k.to_sym] = v; memo}
if @tokens.key?(record[@key])
token = @tokens[record[@key]]
prefix = @prefix % symbolized_tags
send_insight(token, "#{prefix} #{message}")
elsif @tokens.key?(@default)
token = @tokens[@default]
prefix = @prefix % symbolized_tags
send_insight(token, "#{prefix} #{message}")
else
log.debug "No token found for #{record[@key]} and default log doesn't exist"
end
end
end
|