Class: Fluent::LogentriesOutput
- Inherits:
-
BufferedOutput
- Object
- BufferedOutput
- Fluent::LogentriesOutput
- Defined in:
- lib/fluent/plugin/out_logentries-tmpfix.rb
Defined Under Namespace
Classes: ConnectionFailure
Constant Summary collapse
- SSL_HOST =
"api.logentries.com"- NO_SSL_HOST =
"data.logentries.com"- HOSTNAME =
`hostname`.strip
Instance Method Summary collapse
- #client ⇒ Object
- #configure(conf) ⇒ Object
-
#format(tag, time, record) ⇒ Object
This method is called when an event reaches Fluentd.
-
#generate_tokens_list ⇒ Object
Parse an YML file and generate a list of tokens.
-
#get_token(tag, record) ⇒ Object
Returns the correct token to use for a given tag / records.
- #send_logentries(token, data) ⇒ Object
- #shutdown ⇒ Object
- #start ⇒ Object
-
#write(chunk) ⇒ Object
NOTE! This method is called by internal thread, not Fluentd’s main thread.
Instance Method Details
#client ⇒ Object
40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 |
# File 'lib/fluent/plugin/out_logentries-tmpfix.rb', line 40 def client @_socket ||= if @use_ssl context = OpenSSL::SSL::SSLContext.new socket = TCPSocket.new SSL_HOST, @port ssl_client = OpenSSL::SSL::SSLSocket.new socket, context ssl_client.connect else if @protocol == 'tcp' TCPSocket.new NO_SSL_HOST, @port else udp_client = UDPSocket.new udp_client.connect NO_SSL_HOST, @port udp_client end end end |
#configure(conf) ⇒ Object
25 26 27 28 29 30 |
# File 'lib/fluent/plugin/out_logentries-tmpfix.rb', line 25 def configure(conf) super @tokens = nil @last_edit = Time.at(0) end |
#format(tag, time, record) ⇒ Object
This method is called when an event reaches Fluentd.
60 61 62 |
# File 'lib/fluent/plugin/out_logentries-tmpfix.rb', line 60 def format(tag, time, record) return [tag, record].to_msgpack end |
#generate_tokens_list ⇒ Object
Parse an YML file and generate a list of tokens. It will only re-generate the list on changes.
66 67 68 69 70 71 72 73 74 75 76 77 78 79 |
# File 'lib/fluent/plugin/out_logentries-tmpfix.rb', line 66 def generate_tokens_list begin edit_time = File.mtime(@config_path) if edit_time > @last_edit @tokens = YAML::load_file(@config_path) @last_edit = edit_time log.info "Token(s) list updated." end rescue Exception => e log.warn "Could not load configuration. #{e.message}" end end |
#get_token(tag, record) ⇒ Object
Returns the correct token to use for a given tag / records
82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 |
# File 'lib/fluent/plugin/out_logentries-tmpfix.rb', line 82 def get_token(tag, record) app_name = record["app_name"] || '' # Config Structure # ----------------------- # app-name: # app: TOKEN # access: TOKEN (optional) # error: TOKEN (optional) @tokens.each do |key, value| if app_name == key || tag.index(key) != nil default = value['app'] case tag when @tag_access_log return value['access'] || default when @tag_error_log return value['error'] || default else return default end end end return default_token end |
#send_logentries(token, data) ⇒ Object
128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 |
# File 'lib/fluent/plugin/out_logentries-tmpfix.rb', line 128 def send_logentries(token, data) retries = 0 begin client.write("#{token} #{HOSTNAME} #{data} \n") rescue Errno::EMSGSIZE str_length = data.length send_logentries(token, data[0..str_length/2]) send_logentries(token, data[(str_length/2)+1..str_length]) log.warn "Message Too Long, re-sending it in two part..." rescue => e if retries < @max_retries retries += 1 @_socket = nil log.warn "Could not push logs to Logentries, resetting connection and trying again. #{e.message}" sleep 5**retries retry end raise ConnectionFailure, "Could not push logs to Logentries after #{retries} retries. #{e.message}" end end |
#shutdown ⇒ Object
36 37 38 |
# File 'lib/fluent/plugin/out_logentries-tmpfix.rb', line 36 def shutdown super end |
#start ⇒ Object
32 33 34 |
# File 'lib/fluent/plugin/out_logentries-tmpfix.rb', line 32 def start super end |
#write(chunk) ⇒ Object
NOTE! This method is called by internal thread, not Fluentd’s main thread. So IO wait doesn’t affect other plugins.
111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 |
# File 'lib/fluent/plugin/out_logentries-tmpfix.rb', line 111 def write(chunk) generate_tokens_list() return unless @tokens.is_a? Hash chunk.msgpack_each do |tag, record| next unless record.is_a? Hash next unless @use_json or record.has_key? "message" token = get_token(tag, record) next if token.nil? # Clean up the string to avoid blank line in logentries = @use_json ? record.to_json : record["message"].rstrip() send_logentries(token, ) end end |