Class: Fluent::Papertrail
- Inherits:
-
BufferedOutput
- Object
- BufferedOutput
- Fluent::Papertrail
- Defined in:
- lib/fluent/plugin/out_papertrail.rb
Defined Under Namespace
Classes: SocketFailureError
Constant Summary collapse
- DISCARD_STRING =
declare const string for nullifying token if we decide to discard records
'DISCARD'
Instance Attribute Summary collapse
-
#sockets ⇒ Object
Returns the value of attribute sockets.
Instance Method Summary collapse
- #configure(conf) ⇒ Object
- #create_packet(tag, time, record) ⇒ Object
- #create_socket(socket_key) ⇒ Object
- #form_socket_key(host, port) ⇒ Object
- #format(tag, time, record) ⇒ Object
- #pick_socket(record) ⇒ Object
- #send_to_papertrail(packet, socket_key) ⇒ Object
- #split_socket_key(socket_key) ⇒ Object
- #write(chunk) ⇒ Object
Instance Attribute Details
#sockets ⇒ Object
Returns the value of attribute sockets.
6 7 8 |
# File 'lib/fluent/plugin/out_papertrail.rb', line 6 def sockets @sockets end |
Instance Method Details
#configure(conf) ⇒ Object
25 26 27 28 29 30 31 32 33 34 |
# File 'lib/fluent/plugin/out_papertrail.rb', line 25 def configure(conf) super # create initial sockets hash and socket based on config param @sockets = {} socket_key = form_socket_key(@papertrail_host, @papertrail_port) @sockets[socket_key] = create_socket(socket_key) # redefine default hostname if it's been passed in through ENV @default_hostname = ENV['FLUENT_HOSTNAME'] || @default_hostname @fallback_severity = ENV['FLUENT_FALLBACK_SEVERITY'] || @fallback_severity end |
#create_packet(tag, time, record) ⇒ Object
79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 |
# File 'lib/fluent/plugin/out_papertrail.rb', line 79 def create_packet(tag, time, record) # construct syslog packet from fluent record packet = SyslogProtocol::Packet.new packet.hostname = record['hostname'] || @default_hostname packet.facility = record['facility'] || 'local0' begin packet.severity = record['severity'] || 'info' rescue ArgumentError packet.severity = @fallback_severity end packet.tag = record['program'] || tag packet.content = record['message'] || record['log'] packet.time = time ? Time.at(time) : Time.now packet end |
#create_socket(socket_key) ⇒ Object
61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 |
# File 'lib/fluent/plugin/out_papertrail.rb', line 61 def create_socket(socket_key) log.info "initializing tcp socket for #{socket_key}" begin host, port = split_socket_key(socket_key) socket = TCPSocket.new(host, port) log.debug "enabling ssl for socket #{socket_key}" ssl = OpenSSL::SSL::SSLSocket.new(socket) # close tcp and ssl socket when either fails ssl.sync_close = true # initiate SSL/TLS handshake with server ssl.connect rescue => e log.warn "failed to create tcp socket #{socket_key}: #{e}" ssl = nil end ssl end |
#form_socket_key(host, port) ⇒ Object
52 53 54 |
# File 'lib/fluent/plugin/out_papertrail.rb', line 52 def form_socket_key(host, port) "#{host}:#{port}" end |
#format(tag, time, record) ⇒ Object
36 37 38 |
# File 'lib/fluent/plugin/out_papertrail.rb', line 36 def format(tag, time, record) [tag, time, record].to_msgpack end |
#pick_socket(record) ⇒ Object
95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 |
# File 'lib/fluent/plugin/out_papertrail.rb', line 95 def pick_socket(record) # if kubernetes pod has papertrail destination as annotation, use it if record.dig('kubernetes', 'annotations', 'solarwinds_io/papertrail_host') && \ record.dig('kubernetes', 'annotations', 'solarwinds_io/papertrail_port') host = record['kubernetes']['annotations']['solarwinds_io/papertrail_host'] port = record['kubernetes']['annotations']['solarwinds_io/papertrail_port'] # else if kubernetes namespace has papertrail destination as annotation, use it elsif record.dig('kubernetes', 'namespace_annotations', 'solarwinds_io/papertrail_host') && \ record.dig('kubernetes', 'namespace_annotations', 'solarwinds_io/papertrail_port') host = record['kubernetes']['namespace_annotations']['solarwinds_io/papertrail_host'] port = record['kubernetes']['namespace_annotations']['solarwinds_io/papertrail_port'] # else if it is a kubernetes log and we're discarding unannotated logs elsif @discard_unannotated_pod_logs && record.dig('kubernetes') host = DISCARD_STRING port = DISCARD_STRING # else use pre-configured destination else host = @papertrail_host port = @papertrail_port end socket_key = form_socket_key(host, port) if socket_key == ':' kubernetes_err_msg = '' if record.dig('kubernetes', 'namespace_name') namespace_name = record['kubernetes']['namespace_name'] kubernetes_err_msg = " from Kubernetes namespace: \"#{namespace_name}\"" end log.warn("Received nil socket_configuration#{kubernetes_err_msg}. Discarding message.") host = DISCARD_STRING port = DISCARD_STRING socket_key = form_socket_key(host, port) end socket_key end |
#send_to_papertrail(packet, socket_key) ⇒ Object
132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 |
# File 'lib/fluent/plugin/out_papertrail.rb', line 132 def send_to_papertrail(packet, socket_key) if @sockets[socket_key].nil? err_msg = "Unable to create socket with #{socket_key}" raise SocketFailureError, err_msg else begin # send it @sockets[socket_key].puts packet.assemble(max_size=@maximum_syslog_packet_size) rescue => e err_msg = "Error writing to #{socket_key}: #{e}" # socket failed, reset to nil to recreate for the next write @sockets[socket_key] = nil raise SocketFailureError, err_msg, e.backtrace end end end |
#split_socket_key(socket_key) ⇒ Object
56 57 58 59 |
# File 'lib/fluent/plugin/out_papertrail.rb', line 56 def split_socket_key(socket_key) socket_key_arr = socket_key.split(':') return socket_key_arr[0], socket_key_arr[1] end |
#write(chunk) ⇒ Object
40 41 42 43 44 45 46 47 48 49 50 |
# File 'lib/fluent/plugin/out_papertrail.rb', line 40 def write(chunk) chunk.msgpack_each {|(tag, time, record)| socket_key = pick_socket(record) unless socket_key.eql? form_socket_key(DISCARD_STRING, DISCARD_STRING) # recreate the socket if it's nil @sockets[socket_key] ||= create_socket(socket_key) packet = create_packet(tag, time, record) send_to_papertrail(packet, socket_key) end } end |