Class: Fluent::SimpleLogentriesOutput
- Inherits:
-
BufferedOutput
- Object
- BufferedOutput
- Fluent::SimpleLogentriesOutput
show all
- Defined in:
- lib/fluent/plugin/out_simple-logentries.rb
Defined Under Namespace
Classes: ConnectionFailure
Constant Summary
collapse
- SSL_HOST =
"api.logentries.com"
- NO_SSL_HOST =
"data.logentries.com"
- MAX_ENTRY_SIZE =
8192
- SPLITED_ENTRY_SIZE =
MAX_ENTRY_SIZE - 256
Instance Method Summary
collapse
Instance Method Details
#client ⇒ Object
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
|
# File 'lib/fluent/plugin/out_simple-logentries.rb', line 33
def client
@_socket ||= if @use_ssl
context = OpenSSL::SSL::SSLContext.new
socket = TCPSocket.new SSL_HOST, @port
ssl_client = OpenSSL::SSL::SSLSocket.new socket, context
ssl_client.connect
else
if @protocol == 'tcp'
TCPSocket.new NO_SSL_HOST, @port
else
udp_client = UDPSocket.new
udp_client.connect NO_SSL_HOST, @port
udp_client
end
end
end
|
21
22
23
|
# File 'lib/fluent/plugin/out_simple-logentries.rb', line 21
def configure(conf)
super
end
|
52
53
54
|
# File 'lib/fluent/plugin/out_simple-logentries.rb', line 52
def format(tag, time, record)
return [tag, record].to_msgpack
end
|
#push(data) ⇒ Object
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
|
# File 'lib/fluent/plugin/out_simple-logentries.rb', line 90
def push(data)
retries = 0
begin
client.write("#{@token} #{data} \n")
rescue Errno::ECONNREFUSED, Errno::ETIMEDOUT => e
if retries < @max_retries
retries += 1
@_socket = nil
log.warn "Could not push logs to Logentries, resetting connection and trying again. #{e.message}"
sleep 5**retries
retry
end
raise ConnectionFailure, "Could not push logs to Logentries after #{retries} retries. #{e.message}"
rescue Errno::EMSGSIZE => e
log.warn "Could not push logs to Logentries. #{e.message}"
end
end
|
#send_logentries(tag, record) ⇒ Object
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
|
# File 'lib/fluent/plugin/out_simple-logentries.rb', line 64
def send_logentries(tag, record)
data = if @append_tag
record.merge({tag: tag})
else
record
end
jsonfied = JSON.generate(data)
if (data.member?(:messages) || data.member?('messages')) && jsonfied.length > MAX_ENTRY_SIZE
identifyer = SecureRandom.uuid
messages = if data.member?(:messages)
data[:messages]
elsif data.member?('messages')
data['messages']
end
data.delete(:messages)
data.delete('messages')
([data] + split_messages(messages).map{|i| {messages: i}} ).each_with_index { |item, idx|
push(JSON.generate({sequence: idx, identifyer: identifyer}.merge(item)))
}
else
push(jsonfied)
end
rescue => e
log.warn "Could not push logs to Logentries. #{e.message}"
end
|
#shutdown ⇒ Object
29
30
31
|
# File 'lib/fluent/plugin/out_simple-logentries.rb', line 29
def shutdown
super
end
|
#split_messages(messages) ⇒ Object
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
|
# File 'lib/fluent/plugin/out_simple-logentries.rb', line 108
def split_messages(messages)
if messages.is_a? String
str_length = messages.length
return [messages] if SPLITED_ENTRY_SIZE > str_length
return split_messages(messages[0..str_length/2-1]) +
split_messages(messages[(str_length/2)..str_length])
elsif messages.is_a? Array
arr_length = messages.length
jsonfied = JSON.generate(messages)
str_length = jsonfied.length
return [messages] if SPLITED_ENTRY_SIZE > str_length
if arr_length == 1
split_messages(messages[0])
else
return split_messages(messages[0..arr_length/2-1]) +
split_messages(messages[(arr_length/2)..arr_length])
end
elsif messages.is_a? Hash
return split_messages(JSON.generate(messages))
else
raise TypeError, "Can't split unknown data type."
end
end
|
#start ⇒ Object
25
26
27
|
# File 'lib/fluent/plugin/out_simple-logentries.rb', line 25
def start
super
end
|
#write(chunk) ⇒ Object
56
57
58
59
60
61
62
|
# File 'lib/fluent/plugin/out_simple-logentries.rb', line 56
def write(chunk)
chunk.msgpack_each do |tag, record|
if record.is_a? Hash
send_logentries(tag, record)
end
end
end
|