Class: LogentriesOutput
- Inherits:
-
Fluent::BufferedOutput
- Object
- Fluent::BufferedOutput
- LogentriesOutput
- Defined in:
- lib/fluent/plugin/out_logentries.rb
Defined Under Namespace
Classes: ConnectionFailure
Instance Method Summary collapse
- #client ⇒ Object
- #configure(conf) ⇒ Object
-
#format(tag, time, record) ⇒ Object
This method is called when an event reaches Fluentd.
-
#generate_token(path) ⇒ Object
Scan a given directory for logentries tokens.
-
#get_token(tag, record, tokens) ⇒ Object
returns the correct token to use for a given tag / Records.
- #send_logentries(data) ⇒ Object
- #shutdown ⇒ Object
- #start ⇒ Object
-
#write(chunk) ⇒ Object
NOTE! This method is called by internal thread, not Fluentd’s main thread.
Instance Method Details
#client ⇒ Object
25 26 27 |
# File 'lib/fluent/plugin/out_logentries.rb', line 25 def client @_socket ||= TCPSocket.new @host, @port end |
#configure(conf) ⇒ Object
13 14 15 |
# File 'lib/fluent/plugin/out_logentries.rb', line 13 def configure(conf) super end |
#format(tag, time, record) ⇒ Object
This method is called when an event reaches Fluentd.
30 31 32 |
# File 'lib/fluent/plugin/out_logentries.rb', line 30 def format(tag, time, record) return [tag, record].to_msgpack end |
#generate_token(path) ⇒ Object
Scan a given directory for logentries tokens
35 36 37 38 39 40 41 42 43 44 45 |
# File 'lib/fluent/plugin/out_logentries.rb', line 35 def generate_token(path) tokens = {} Dir[path + "*.token"].each do |file| key = File.basename(file, ".token") #remove path/extension from filename #read the first line, remove unwanted char and close it tokens[key] = File.open(file, &:readline).gsub(/\r\n|\r|\n/, '') end tokens end |
#get_token(tag, record, tokens) ⇒ Object
returns the correct token to use for a given tag / Records
48 49 50 51 52 53 54 55 56 57 58 59 |
# File 'lib/fluent/plugin/out_logentries.rb', line 48 def get_token(tag, record, tokens) tag ||= "" record ||= "" tokens.each do |key, value| if tag.index(key) != nil || record.index(key) != nil then return value end end return nil end |
#send_logentries(data) ⇒ Object
77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 |
# File 'lib/fluent/plugin/out_logentries.rb', line 77 def send_logentries(data) retries = 0 begin client.puts data rescue Errno::ECONNREFUSED, Errno::ETIMEDOUT => e if retries < 2 retries += 1 @_socket = nil log.warn "Could not push logs to Logentries, resetting connection and trying again. #{e.message}" sleep 2**retries retry end raise ConnectionFailure, "Could not push logs to Logentries after #{retries} retries. #{e.message}" end end |
#shutdown ⇒ Object
21 22 23 |
# File 'lib/fluent/plugin/out_logentries.rb', line 21 def shutdown super end |
#start ⇒ Object
17 18 19 |
# File 'lib/fluent/plugin/out_logentries.rb', line 17 def start super end |
#write(chunk) ⇒ Object
NOTE! This method is called by internal thread, not Fluentd’s main thread. So IO wait doesn’t affect other plugins.
62 63 64 65 66 67 68 69 70 71 72 73 74 75 |
# File 'lib/fluent/plugin/out_logentries.rb', line 62 def write(chunk) tokens = generate_token(@path) chunk.msgpack_each do |tag, record| next unless record.is_a? Hash token = get_token(tag, record, tokens) next if token.nil? if record.has_key?("message") send_logentries(record["message"] + ' ' + token) end end end |