Class: Fluent::SimpleLogentriesOutput
- Inherits:
-
BufferedOutput
- Object
- BufferedOutput
- Fluent::SimpleLogentriesOutput
show all
- Defined in:
- lib/fluent/plugin/out_simple-logentries.rb
Defined Under Namespace
Classes: ConnectionFailure
Constant Summary
collapse
- SSL_HOST =
"api.logentries.com"
- NO_SSL_HOST =
"data.logentries.com"
- MAX_ENTRY_SIZE =
8192
- SPLITED_ENTRY_SIZE =
MAX_ENTRY_SIZE - 256
Instance Method Summary
collapse
Instance Method Details
#client ⇒ Object
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
|
# File 'lib/fluent/plugin/out_simple-logentries.rb', line 34
def client
@_socket ||= if @use_ssl
context = OpenSSL::SSL::SSLContext.new
socket = TCPSocket.new SSL_HOST, @port
ssl_client = OpenSSL::SSL::SSLSocket.new socket, context
ssl_client.connect
else
if @protocol == 'tcp'
TCPSocket.new NO_SSL_HOST, @port
else
udp_client = UDPSocket.new
udp_client.connect NO_SSL_HOST, @port
udp_client
end
end
end
|
21
22
23
24
|
# File 'lib/fluent/plugin/out_simple-logentries.rb', line 21
def configure(conf)
super
@last_edit = Time.at(0)
end
|
53
54
55
|
# File 'lib/fluent/plugin/out_simple-logentries.rb', line 53
def format(tag, time, record)
return [tag, record].to_msgpack
end
|
#push(data) ⇒ Object
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
|
# File 'lib/fluent/plugin/out_simple-logentries.rb', line 90
def push(data)
retries = 0
begin
client.write("#{@token} #{data} \n")
rescue Errno::ECONNREFUSED, Errno::ETIMEDOUT => e
if retries < @max_retries
retries += 1
@_socket = nil
log.warn "Could not push logs to Logentries, resetting connection and trying again. #{e.message}"
sleep 5**retries
retry
end
raise ConnectionFailure, "Could not push logs to Logentries after #{retries} retries. #{e.message}"
rescue Errno::EMSGSIZE => e
log.warn "Could not push logs to Logentries. #{e.message}"
end
end
|
#send_logentries(tag, record) ⇒ Object
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
|
# File 'lib/fluent/plugin/out_simple-logentries.rb', line 65
def send_logentries(tag, record)
data = if @append_tag
record.merge({tag: tag})
else
record
end
jsonfied = JSON.generate(data)
if (data.member?(:messages) || data.member?('messages')) && jsonfied.length > MAX_ENTRY_SIZE
identifyer = SecureRandom.uuid
messages = if data.member?(:messages)
data[:messages]
elsif data.member?('messages')
data['messages']
end
data.delete(:messages) && data.delete('messages')
([data] + split_messages(messages).map{|i| {messages: i}} ).each_with_index { |item, idx|
push(JSON.generate({sequence: idx, identifyer: identifyer}.merge(item)))
}
else
push(jsonfied)
end
rescue => e
log.warn "Could not push logs to Logentries. #{e.message}"
end
|
#shutdown ⇒ Object
30
31
32
|
# File 'lib/fluent/plugin/out_simple-logentries.rb', line 30
def shutdown
super
end
|
#split_messages(messages) ⇒ Object
108
109
110
111
112
113
|
# File 'lib/fluent/plugin/out_simple-logentries.rb', line 108
def split_messages(messages)
str_length = messages.length
return [messages] if SPLITED_ENTRY_SIZE > str_length
split_messages(messages[0..str_length/2]) +
split_messages(messages[(str_length/2)+1..str_length])
end
|
#start ⇒ Object
26
27
28
|
# File 'lib/fluent/plugin/out_simple-logentries.rb', line 26
def start
super
end
|
#write(chunk) ⇒ Object
57
58
59
60
61
62
63
|
# File 'lib/fluent/plugin/out_simple-logentries.rb', line 57
def write(chunk)
chunk.msgpack_each do |tag, record|
if record.is_a? Hash
send_logentries(tag, record)
end
end
end
|