Class: Sumologic

Inherits:
Fluent::BufferedOutput
  • Object
show all
Defined in:
lib/fluent/plugin/out_sumologic.rb

Instance Method Summary collapse

Instance Method Details

#configure(conf) ⇒ Object

This method is called before starting.



48
49
50
51
52
53
54
55
56
57
58
59
# File 'lib/fluent/plugin/out_sumologic.rb', line 48

def configure(conf)
  unless conf['endpoint'] =~ URI::regexp
    raise Fluent::ConfigError, "Invalid SumoLogic endpoint url: #{conf['endpoint']}"
  end

  unless conf['log_format'] =~ /\A(?:json|text|json_merge)\z/
    raise Fluent::ConfigError, "Invalid log_format #{conf['log_format']} must be text, json or json_merge"
  end

  @sumo_conn = SumologicConnection.new(conf['endpoint'], @verify_ssl)
  super
end

#dump_log(log) ⇒ Object

Strip sumo_metadata and dump to json



88
89
90
91
# File 'lib/fluent/plugin/out_sumologic.rb', line 88

def dump_log(log)
  log.delete('_sumo_metadata')
  Yajl.dump(log)
end

#format(tag, time, record) ⇒ Object



93
94
95
# File 'lib/fluent/plugin/out_sumologic.rb', line 93

def format(tag, time, record)
  [tag, time, record].to_msgpack
end

#merge_json(record) ⇒ Object

Used to merge log record into top level json



72
73
74
75
76
77
78
79
80
81
82
83
84
85
# File 'lib/fluent/plugin/out_sumologic.rb', line 72

def merge_json(record)
  if record.has_key?(@log_key)
    log = record[@log_key].strip
    if log[0].eql?('{') && log[-1].eql?('}')
      begin
        record = JSON.parse(log).merge(record)
        record.delete(@log_key)
      rescue JSON::ParserError
        # do nothing, ignore
      end
    end
  end
  record
end

#shutdownObject

This method is called when shutting down.



67
68
69
# File 'lib/fluent/plugin/out_sumologic.rb', line 67

def shutdown
  super
end

#startObject

This method is called when starting.



62
63
64
# File 'lib/fluent/plugin/out_sumologic.rb', line 62

def start
  super
end

#sumo_key(sumo) ⇒ Object



97
98
99
100
101
102
# File 'lib/fluent/plugin/out_sumologic.rb', line 97

def sumo_key(sumo)
  source_name = sumo['source'] || @source_name
  source_category = sumo['category'] || @source_category
  source_host = sumo['host'] || @source_host
  "#{source_name}:#{source_category}:#{source_host}"
end

#sumo_timestamp(time) ⇒ Object

Convert timestamp to 13 digit epoch if necessary



105
106
107
# File 'lib/fluent/plugin/out_sumologic.rb', line 105

def sumo_timestamp(time)
  time.to_s.length == 13 ? time : time * 1000
end

#write(chunk) ⇒ Object

This method is called every flush interval. Write the buffer chunk



110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
# File 'lib/fluent/plugin/out_sumologic.rb', line 110

def write(chunk)
  messages_list = {}

  # Sort messages
  chunk.msgpack_each do |tag, time, record|
     = record.fetch('_sumo_metadata', {})
    key = sumo_key()
    log_format = ['log_format'] || @log_format

    # Strip any unwanted newlines
    record[@log_key].chomp! if record[@log_key]

    case log_format
      when 'text'
        log = record[@log_key]
        unless log.nil?
          log.strip!
        end
      when 'json_merge'
        log = dump_log(merge_json({:timestamp => sumo_timestamp(time)}.merge(record)))
      else
        log = dump_log({:timestamp => sumo_timestamp(time)}.merge(record))
    end

    unless log.nil?
      if messages_list.key?(key)
        messages_list[key].push(log)
      else
        messages_list[key] = [log]
      end
    end

  end

  # Push logs to sumo
  messages_list.each do |key, messages|
    source_name, source_category, source_host = key.split(':')
    @sumo_conn.publish(
        messages.join("\n"),
        source_host=source_host,
        source_category=source_category,
        source_name=source_name
    )
  end

end