Class: Fluent::InsightOutput

Inherits:
BufferedOutput
  • Object
show all
Defined in:
lib/fluent/plugin/out_insight.rb

Defined Under Namespace

Classes: ConnectionFailure

Constant Summary collapse

INSIGHT_DATA_TEMPLATE =
"%{region}.data.logs.insight.rapid7.com"
INSIGHT_REST_TEMPLATE =
"https://%{region}.rest.logs.insight.rapid7.com"
INSIGHT_LOGSETS_TEMPLATE =
"/management/logsets/%{logset_id}"
THREAD_COUNT =
4

Instance Method Summary collapse

Instance Method Details

#clientObject



97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
# File 'lib/fluent/plugin/out_insight.rb', line 97

def client
  insight_data_host = INSIGHT_DATA_TEMPLATE % { :region => @region }
  @_socket ||= if @use_ssl
    context    = OpenSSL::SSL::SSLContext.new
    socket     = TCPSocket.new insight_data_host, @port
    ssl_client = OpenSSL::SSL::SSLSocket.new socket, context
    ssl_client.connect
  else
    if @protocol == 'tcp'
      TCPSocket.new insight_data_host, @port
    else
      udp_client = UDPSocket.new
      udp_client.connect insight_data_host, @port
      udp_client
    end
  end
end

#configure(conf) ⇒ Object



31
32
33
# File 'lib/fluent/plugin/out_insight.rb', line 31

def configure(conf)
  super
end

#format(tag, time, record) ⇒ Object



115
116
117
# File 'lib/fluent/plugin/out_insight.rb', line 115

def format(tag, time, record)
  return [tag, time, record].to_msgpack
end

#insight_log_token(url) ⇒ Object



82
83
84
85
86
87
88
89
90
91
92
93
94
95
# File 'lib/fluent/plugin/out_insight.rb', line 82

def insight_log_token(url)
  log_body = insight_rest_request(url)
  if log_body.key?(@key)
    log_info = log_body[@key]
    if log_info.key?('tokens')
      log.info "Found log #{log_info['name']}"
      return log_info['name'], log_info['tokens'][0]
    else
      log.warn "Log is empty"
    end
  else
    log.warn "Response doesn't contain log info"
  end
end

#insight_rest_request(url) ⇒ Object



68
69
70
71
72
73
74
75
76
77
78
79
80
# File 'lib/fluent/plugin/out_insight.rb', line 68

def insight_rest_request(url)
  uri = URI(url)
  request = Net::HTTP::Get.new(uri)
  request['content-type'] = 'application/json'
  request['x-api-key'] = @api_key
  response = Net::HTTP.start(uri.hostname, uri.port, :use_ssl => uri.scheme == 'https') {|http|
    http.request(request)
  }
  if response.code == "200"
    return JSON.parse(response.body)
  end
  log.error "Request was failed HTTP #{response.code}: \n#{response.body}"
end

#send_insight(token, data) ⇒ Object



143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
# File 'lib/fluent/plugin/out_insight.rb', line 143

def send_insight(token, data)
  retries = 0
  begin
    client.write("#{token} #{data} \n")
  rescue Errno::EMSGSIZE
    str_length = data.length
    send_insight(token, data[0..str_length/2])
    send_insight(token, data[(str_length/2) + 1..str_length])
    log.warn "Message Too Long, re-sending it in two part..."
  rescue => e
    if retries < @max_retries
      retries += 1
      @_socket = nil
      log.warn "Could not push logs to Insight, resetting connection and trying again. #{e.message}"
      sleep 5**retries
      retry
    end
    raise ConnectionFailure, "Could not push logs to Insight after #{retries} retries. #{e.message}"
  end
end

#shutdownObject



64
65
66
# File 'lib/fluent/plugin/out_insight.rb', line 64

def shutdown
  super
end

#startObject



35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
# File 'lib/fluent/plugin/out_insight.rb', line 35

def start
  logsets_url = (INSIGHT_REST_TEMPLATE + INSIGHT_LOGSETS_TEMPLATE) % { :region => @region, :logset_id => @logset_id }
  @insight_tags = Hash[@tags.split(",").each_with_object(nil).to_a]
  logset_body = insight_rest_request(logsets_url)
  threads = []
  @tokens = Hash.new
  mutex = Mutex.new
  if logset_body.instance_of?(Hash) and logset_body.key?('logset')
    logset_info = logset_body['logset']
    if logset_info.key?('logs_info')
      logs_info = logset_info['logs_info']
      log_urls = logs_info.map { |log| log['links'][0]['href'] }
      THREAD_COUNT.times.map {
        Thread.new(log_urls, @tokens) do |urls, tokens|
          while url = mutex.synchronize { urls.pop }
            log_name, token = insight_log_token(url)
            mutex.synchronize { tokens[log_name] = token }
          end
        end
      }.each(&:join)
    else
      log.warn "No logs info found in logset response"
    end
  else
    log.warn "Logset emtity is empty"
  end
  super
end

#write(chunk) ⇒ Object



119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
# File 'lib/fluent/plugin/out_insight.rb', line 119

def write(chunk)
  return if @tokens.empty?
  chunk.msgpack_each do |(tag, time, record)|
    next unless record.is_a? Hash
    message = (record.delete('message')&.to_s&.rstrip || '')
    next if message.empty?
    @insight_tags.each { |k,v|
      @insight_tags[k] = record[k]
    }
    symbolized_tags = @insight_tags.inject({}){|memo,(k,v)| memo[k.to_sym] = v; memo}
    if @tokens.key?(record[@key])
      token = @tokens[record[@key]]
      prefix = @prefix % symbolized_tags
      send_insight(token,  "#{prefix} #{message}")
    elsif @tokens.key?(@default)
      token = @tokens[@default]
      prefix = @prefix % symbolized_tags
      send_insight(token,  "#{prefix} #{message}")
    else
      log.debug "No token found for #{record[@key]} and default log doesn't exist"
    end
  end
end