Class: Fluent::SplunkHECOutput

Inherits:
BufferedOutput
  • Object
show all
Defined in:
lib/fluent/plugin/out_splunkhec.rb

Instance Method Summary collapse

Instance Method Details

#configure(conf) ⇒ Object

This method is called before starting. Here we construct the Splunk HEC URL to POST data to If the configuration is invalid, raise Fluent::ConfigError.



24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
# File 'lib/fluent/plugin/out_splunkhec.rb', line 24

def configure(conf)
  super

  @protocol = conf['protocol']

  @splunk_url = @protocol + '://' + conf['host'] + ':' + conf['port'] + '/services/collector/event'
  log.debug 'splunkhec: sent data to ' + @splunk_url
  if conf['token'] != nil
    @token = conf['token']
  else
    raise 'splunkhec: token is empty, please provide a token for this plugin to work'
  end

  if conf['event_host'] == nil
    @event_host = `hostname`
    @event_host = @event_host.delete!("\n")
  else
    @event_host = conf['event_host']
  end

  if conf['sourcetype'] == nil
    @event_sourcetype = 'tag'
  else
    @event_sourcetype = conf['sourcetype']
  end
  
  @event_index = @index
  @event_source = @source
end

#format(tag, time, record) ⇒ Object

This method is called when an event reaches to Fluentd. Use msgpack to serialize the object.



64
65
66
# File 'lib/fluent/plugin/out_splunkhec.rb', line 64

def format(tag, time, record)
  [tag, time, record].to_msgpack
end

#shutdownObject



58
59
60
# File 'lib/fluent/plugin/out_splunkhec.rb', line 58

def shutdown
  super
end

#startObject



54
55
56
# File 'lib/fluent/plugin/out_splunkhec.rb', line 54

def start
  super
end

#write(chunk) ⇒ Object

Loop through all records and sent them to Splunk



69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
# File 'lib/fluent/plugin/out_splunkhec.rb', line 69

def write(chunk)
  begin
    chunk.msgpack_each {|(tag,time,record)|
      # Parse record to Splunk event format
      case record
      when Fixnum
        event = record.to_s
      when Hash
        event = record.to_json.gsub("\"", %q(\\\"))
      else
        event = record
      end

      if @event_sourcetype == 'tag'
        @event_sourcetype = tag
      end

      # Build body for the POST request
      body = '{"time" :' + time.to_s + ', "event" :"' + event + '", "sourcetype" :"' + @event_sourcetype + '", "source" :"' + @event_source + '", "index" :"' + @event_index + '", "host" : "' + @event_host + '"}'
      log.debug "splunkhec: " + body + "\n"
      
      uri = URI(@splunk_url)
      
      # Create client
      http = Net::HTTP.new(uri.host, uri.port)
      
      # Create Request
      req =  Net::HTTP::Post.new(uri)
      # Add headers
      req.add_field "Authorization", "Splunk #{@token}"
      # Add headers
      req.add_field "Content-Type", "application/json; charset=utf-8"
      # Set body
      req.body = body
      # Handle SSL
      if @protocol == 'https'
        http.use_ssl = true
        http.verify_mode = OpenSSL::SSL::VERIFY_NONE
      end

      # Fetch Request
      res = http.request(req)
      log.debug "splunkhec: response HTTP Status Code is #{res.code}"
      if res.code.to_i != 200
        log.debug "splunkhec: response body is #{res.body}"
      end
    }
  rescue => err
    log.fatal("splunkhec: caught exception; exiting")
    log.fatal(err)
  end
end