Class: Fluent::LogentriesOutput
- Inherits:
-
BufferedOutput
- Object
- BufferedOutput
- Fluent::LogentriesOutput
- Defined in:
- lib/fluent/plugin/out_logentries.rb
Defined Under Namespace
Classes: ConnectionFailure
Constant Summary collapse
- SSL_HOST =
"api.logentries.com"- NO_SSL_HOST =
"data.logentries.com"
Instance Method Summary collapse
- #client ⇒ Object
- #configure(conf) ⇒ Object
-
#format(tag, time, record) ⇒ Object
This method is called when an event reaches Fluentd.
-
#generate_tokens_list ⇒ Object
Parse an YML file and generate a list of tokens.
-
#get_token(tag, record) ⇒ Object
Returns the correct token to use for a given tag / records.
- #send_logentries(token, data) ⇒ Object
- #shutdown ⇒ Object
- #start ⇒ Object
-
#write(chunk) ⇒ Object
NOTE! This method is called by internal thread, not Fluentd’s main thread.
Instance Method Details
#client ⇒ Object
39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 |
# File 'lib/fluent/plugin/out_logentries.rb', line 39 def client @_socket ||= if @use_ssl context = OpenSSL::SSL::SSLContext.new socket = TCPSocket.new SSL_HOST, @port ssl_client = OpenSSL::SSL::SSLSocket.new socket, context ssl_client.connect else if @protocol == 'tcp' TCPSocket.new NO_SSL_HOST, @port else udp_client = UDPSocket.new udp_client.connect NO_SSL_HOST, @port udp_client end end end |
#configure(conf) ⇒ Object
24 25 26 27 28 29 |
# File 'lib/fluent/plugin/out_logentries.rb', line 24 def configure(conf) super @tokens = nil @last_edit = Time.at(0) end |
#format(tag, time, record) ⇒ Object
This method is called when an event reaches Fluentd.
59 60 61 |
# File 'lib/fluent/plugin/out_logentries.rb', line 59 def format(tag, time, record) return [tag, record].to_msgpack end |
#generate_tokens_list ⇒ Object
Parse an YML file and generate a list of tokens. It will only re-generate the list on changes.
65 66 67 68 69 70 71 72 73 74 75 76 77 78 |
# File 'lib/fluent/plugin/out_logentries.rb', line 65 def generate_tokens_list begin edit_time = File.mtime(@config_path) if edit_time > @last_edit @tokens = YAML::load_file(@config_path) @last_edit = edit_time log.info "Token(s) list updated." end rescue Exception => e log.warn "Could not load configuration. #{e.message}" end end |
#get_token(tag, record) ⇒ Object
Returns the correct token to use for a given tag / records
81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 |
# File 'lib/fluent/plugin/out_logentries.rb', line 81 def get_token(tag, record) app_name = record["app_name"] || '' # Config Structure # ----------------------- # app-name: # app: TOKEN # access: TOKEN (optional) # error: TOKEN (optional) @tokens.each do |key, value| if app_name == key || tag.index(key) != nil default = value['app'] case tag when @tag_access_log return value['access'] || default when @tag_error_log return value['error'] || default else return default end end end return default_token end |
#send_logentries(token, data) ⇒ Object
127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 |
# File 'lib/fluent/plugin/out_logentries.rb', line 127 def send_logentries(token, data) retries = 0 begin client.write("#{token} #{data} \n") rescue Errno::ECONNREFUSED, Errno::ECONNRESET, Errno::ECONNABORTED, Errno::ETIMEDOUT, Errno::EPIPE => e if retries < @max_retries retries += 1 @_socket = nil log.warn "Could not push logs to Logentries, resetting connection and trying again. #{e.message}" sleep 5**retries retry end raise ConnectionFailure, "Could not push logs to Logentries after #{retries} retries. #{e.message}" rescue Errno::EMSGSIZE str_length = data.length send_logentries(token, data[0..str_length/2]) send_logentries(token, data[(str_length/2)+1..str_length]) log.warn "Message Too Long, re-sending it in two part..." end end |
#shutdown ⇒ Object
35 36 37 |
# File 'lib/fluent/plugin/out_logentries.rb', line 35 def shutdown super end |
#start ⇒ Object
31 32 33 |
# File 'lib/fluent/plugin/out_logentries.rb', line 31 def start super end |
#write(chunk) ⇒ Object
NOTE! This method is called by internal thread, not Fluentd’s main thread. So IO wait doesn’t affect other plugins.
110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 |
# File 'lib/fluent/plugin/out_logentries.rb', line 110 def write(chunk) generate_tokens_list() return unless @tokens.is_a? Hash chunk.msgpack_each do |tag, record| next unless record.is_a? Hash next unless @use_json or record.has_key? "message" token = get_token(tag, record) next if token.nil? # Clean up the string to avoid blank line in logentries = @use_json ? record.to_json : record["message"].rstrip() send_logentries(token, ) end end |