Class: Fluent::Papertrail

Inherits:
BufferedOutput
  • Object
show all
Defined in:
lib/fluent/plugin/out_papertrail.rb

Defined Under Namespace

Classes: SocketFailureError

Constant Summary collapse

DISCARD_STRING =

declare const string for nullifying token if we decide to discard records

'DISCARD'

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#socketsObject

Returns the value of attribute sockets.



6
7
8
# File 'lib/fluent/plugin/out_papertrail.rb', line 6

def sockets
  @sockets
end

Instance Method Details

#configure(conf) ⇒ Object



25
26
27
28
29
30
31
32
33
34
# File 'lib/fluent/plugin/out_papertrail.rb', line 25

def configure(conf)
  super
  # create initial sockets hash and socket based on config param
  @sockets = {}
  socket_key = form_socket_key(@papertrail_host, @papertrail_port)
  @sockets[socket_key] = create_socket(socket_key)
  # redefine default hostname if it's been passed in through ENV
  @default_hostname = ENV['FLUENT_HOSTNAME'] || @default_hostname
  @fallback_severity = ENV['FLUENT_FALLBACK_SEVERITY'] || @fallback_severity
end

#create_packet(tag, time, record) ⇒ Object



79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
# File 'lib/fluent/plugin/out_papertrail.rb', line 79

def create_packet(tag, time, record)
  # construct syslog packet from fluent record
  packet = SyslogProtocol::Packet.new
  packet.hostname = record['hostname'] || @default_hostname
  packet.facility = record['facility'] || 'local0'
  begin
    packet.severity = record['severity'] || 'info'
  rescue ArgumentError
    packet.severity = @fallback_severity
  end
  packet.tag      = record['program'] || tag
  packet.content  = record['message'] || record['log']
  packet.time     = time ? Time.at(time) : Time.now
  packet
end

#create_socket(socket_key) ⇒ Object



61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
# File 'lib/fluent/plugin/out_papertrail.rb', line 61

def create_socket(socket_key)
  log.info "initializing tcp socket for #{socket_key}"
  begin
    host, port = split_socket_key(socket_key)
    socket = TCPSocket.new(host, port)
    log.debug "enabling ssl for socket #{socket_key}"
    ssl = OpenSSL::SSL::SSLSocket.new(socket)
    # close tcp and ssl socket when either fails
    ssl.sync_close = true
    # initiate SSL/TLS handshake with server
    ssl.connect
  rescue => e
    log.warn "failed to create tcp socket #{socket_key}: #{e}"
    ssl = nil
  end
  ssl
end

#form_socket_key(host, port) ⇒ Object



52
53
54
# File 'lib/fluent/plugin/out_papertrail.rb', line 52

def form_socket_key(host, port)
  "#{host}:#{port}"
end

#format(tag, time, record) ⇒ Object



36
37
38
# File 'lib/fluent/plugin/out_papertrail.rb', line 36

def format(tag, time, record)
  [tag, time, record].to_msgpack
end

#pick_socket(record) ⇒ Object



95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
# File 'lib/fluent/plugin/out_papertrail.rb', line 95

def pick_socket(record)
  # if kubernetes pod has papertrail destination as annotation, use it
  if record.dig('kubernetes', 'annotations', 'solarwinds_io/papertrail_host') && \
     record.dig('kubernetes', 'annotations', 'solarwinds_io/papertrail_port')
    host = record['kubernetes']['annotations']['solarwinds_io/papertrail_host']
    port = record['kubernetes']['annotations']['solarwinds_io/papertrail_port']
  # else if kubernetes namespace has papertrail destination as annotation, use it
  elsif record.dig('kubernetes', 'namespace_annotations', 'solarwinds_io/papertrail_host') && \
        record.dig('kubernetes', 'namespace_annotations', 'solarwinds_io/papertrail_port')
    host = record['kubernetes']['namespace_annotations']['solarwinds_io/papertrail_host']
    port = record['kubernetes']['namespace_annotations']['solarwinds_io/papertrail_port']
  # else if it is a kubernetes log and we're discarding unannotated logs
  elsif @discard_unannotated_pod_logs && record.dig('kubernetes')
    host = DISCARD_STRING
    port = DISCARD_STRING
  # else use pre-configured destination
  else
    host = @papertrail_host
    port = @papertrail_port
  end
  socket_key = form_socket_key(host, port)

  if socket_key == ':'
    kubernetes_err_msg = ''
    if record.dig('kubernetes', 'namespace_name')
      namespace_name = record['kubernetes']['namespace_name']
      kubernetes_err_msg = " from Kubernetes namespace: \"#{namespace_name}\""
    end
    log.warn("Received nil socket_configuration#{kubernetes_err_msg}. Discarding message.")
    host = DISCARD_STRING
    port = DISCARD_STRING
    socket_key = form_socket_key(host, port)
  end

  socket_key
end

#send_to_papertrail(packet, socket_key) ⇒ Object



132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
# File 'lib/fluent/plugin/out_papertrail.rb', line 132

def send_to_papertrail(packet, socket_key)
  if @sockets[socket_key].nil?
    err_msg = "Unable to create socket with #{socket_key}"
    raise SocketFailureError, err_msg
  else
    begin
      # send it
      @sockets[socket_key].puts packet.assemble(max_size=@maximum_syslog_packet_size)
    rescue => e
      err_msg = "Error writing to #{socket_key}: #{e}"
      # socket failed, reset to nil to recreate for the next write
      @sockets[socket_key] = nil
      raise SocketFailureError, err_msg, e.backtrace
    end
  end
end

#split_socket_key(socket_key) ⇒ Object



56
57
58
59
# File 'lib/fluent/plugin/out_papertrail.rb', line 56

def split_socket_key(socket_key)
  socket_key_arr = socket_key.split(':')
  return socket_key_arr[0], socket_key_arr[1]
end

#write(chunk) ⇒ Object



40
41
42
43
44
45
46
47
48
49
50
# File 'lib/fluent/plugin/out_papertrail.rb', line 40

def write(chunk)
  chunk.msgpack_each {|(tag, time, record)|
    socket_key = pick_socket(record)
    unless socket_key.eql? form_socket_key(DISCARD_STRING, DISCARD_STRING)
      # recreate the socket if it's nil
      @sockets[socket_key] ||= create_socket(socket_key)
      packet = create_packet(tag, time, record)
      send_to_papertrail(packet, socket_key)
    end
  }
end