Class: Sumologic
- Inherits:
-
Fluent::BufferedOutput
- Object
- Fluent::BufferedOutput
- Sumologic
- Defined in:
- lib/fluent/plugin/out_sumologic.rb
Instance Method Summary collapse
-
#configure(conf) ⇒ Object
This method is called before starting.
-
#dump_log(log) ⇒ Object
Strip sumo_metadata and dump to json.
- #format(tag, time, record) ⇒ Object
-
#merge_json(record) ⇒ Object
Used to merge log record into top level json.
-
#shutdown ⇒ Object
This method is called when shutting down.
-
#start ⇒ Object
This method is called when starting.
- #sumo_key(sumo) ⇒ Object
-
#sumo_timestamp(time) ⇒ Object
Convert timestamp to 13 digit epoch if necessary.
-
#write(chunk) ⇒ Object
This method is called every flush interval.
Instance Method Details
#configure(conf) ⇒ Object
This method is called before starting.
48 49 50 51 52 53 54 55 56 57 58 59 |
# File 'lib/fluent/plugin/out_sumologic.rb', line 48 def configure(conf) unless conf['endpoint'] =~ URI::regexp raise Fluent::ConfigError, "Invalid SumoLogic endpoint url: #{conf['endpoint']}" end unless conf['log_format'] =~ /\A(?:json|text|json_merge)\z/ raise Fluent::ConfigError, "Invalid log_format #{conf['log_format']} must be text, json or json_merge" end @sumo_conn = SumologicConnection.new(conf['endpoint'], @verify_ssl) super end |
#dump_log(log) ⇒ Object
Strip sumo_metadata and dump to json
88 89 90 91 |
# File 'lib/fluent/plugin/out_sumologic.rb', line 88 def dump_log(log) log.delete('_sumo_metadata') Yajl.dump(log) end |
#format(tag, time, record) ⇒ Object
93 94 95 |
# File 'lib/fluent/plugin/out_sumologic.rb', line 93 def format(tag, time, record) [tag, time, record].to_msgpack end |
#merge_json(record) ⇒ Object
Used to merge log record into top level json
72 73 74 75 76 77 78 79 80 81 82 83 84 85 |
# File 'lib/fluent/plugin/out_sumologic.rb', line 72 def merge_json(record) if record.has_key?(@log_key) log = record[@log_key].strip if log[0].eql?('{') && log[-1].eql?('}') begin record = JSON.parse(log).merge(record) record.delete(@log_key) rescue JSON::ParserError # do nothing, ignore end end end record end |
#shutdown ⇒ Object
This method is called when shutting down.
67 68 69 |
# File 'lib/fluent/plugin/out_sumologic.rb', line 67 def shutdown super end |
#start ⇒ Object
This method is called when starting.
62 63 64 |
# File 'lib/fluent/plugin/out_sumologic.rb', line 62 def start super end |
#sumo_key(sumo) ⇒ Object
97 98 99 100 101 102 |
# File 'lib/fluent/plugin/out_sumologic.rb', line 97 def sumo_key(sumo) source_name = sumo['source'] || @source_name source_category = sumo['category'] || @source_category source_host = sumo['host'] || @source_host "#{source_name}:#{source_category}:#{source_host}" end |
#sumo_timestamp(time) ⇒ Object
Convert timestamp to 13 digit epoch if necessary
105 106 107 |
# File 'lib/fluent/plugin/out_sumologic.rb', line 105 def (time) time.to_s.length == 13 ? time : time * 1000 end |
#write(chunk) ⇒ Object
This method is called every flush interval. Write the buffer chunk
110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 |
# File 'lib/fluent/plugin/out_sumologic.rb', line 110 def write(chunk) = {} # Sort messages chunk.msgpack_each do |tag, time, record| = record.fetch('_sumo_metadata', {}) key = sumo_key() log_format = ['log_format'] || @log_format # Strip any unwanted newlines record[@log_key].chomp! if record[@log_key] case log_format when 'text' log = record[@log_key] unless log.nil? log.strip! end when 'json_merge' log = dump_log(merge_json({:timestamp => (time)}.merge(record))) else log = dump_log({:timestamp => (time)}.merge(record)) end unless log.nil? if .key?(key) [key].push(log) else [key] = [log] end end end # Push logs to sumo .each do |key, | source_name, source_category, source_host = key.split(':') @sumo_conn.publish( .join("\n"), source_host=source_host, source_category=source_category, source_name=source_name ) end end |