Class: Aws::Embedded::Metrics::Sinks::Tcp

Inherits:
Object
  • Object
show all
Defined in:
lib/aws-embedded-metrics-customink/sinks/tcp.rb

Overview

Create a sink that will communicate to a CloudWatch Log Agent over a TCP connection.

See docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch_Embedded_Metric_Format_Generation_CloudWatch_Agent.html for configuration information

Constant Summary collapse

AWS_EMF_AGENT_ENDPOINT_ENV_VAR =
'AWS_EMF_AGENT_ENDPOINT'

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(conn_str: ENV.fetch(AWS_EMF_AGENT_ENDPOINT_ENV_VAR, nil), conn_timeout_secs: 10, write_timeout_secs: 10, logger: nil) ⇒ Tcp

Create a new TCP sink. It will use the AWS_EMF_AGENT_ENDPOINT environment variable by default to connect to a CloudWatch Metric Agent.



29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# File 'lib/aws-embedded-metrics-customink/sinks/tcp.rb', line 29

def initialize(conn_str: ENV.fetch(AWS_EMF_AGENT_ENDPOINT_ENV_VAR, nil),
               conn_timeout_secs: 10,
               write_timeout_secs: 10,
               logger: nil)
  if conn_str.nil?
    raise Sinks::Error, "Must specify a connection string or set environment variable #{AWS_EMF_AGENT_ENDPOINT_ENV_VAR}"
  end

  @logger = logger
  @cw_agent_uri = URI.parse(conn_str)
  if @cw_agent_uri.scheme != 'tcp' || !@cw_agent_uri.host || !@cw_agent_uri.port
    raise Sinks::Error, "Expected connection string to be in format tcp://<host>:<port>, got '#{conn_str}'"
  end

  @client_opts = TCPClient::Configuration.create(
    buffered: true,
    keep_alive: true,
    reverse_lookup: true,
    connect_timeout: conn_timeout_secs,
    write_timeout: write_timeout_secs
  )
  @conn = nil
end

Instance Attribute Details

#client_optsObject (readonly)

Returns the value of attribute client_opts.



18
19
20
# File 'lib/aws-embedded-metrics-customink/sinks/tcp.rb', line 18

def client_opts
  @client_opts
end

#queueObject (readonly)

Returns the value of attribute queue.



17
18
19
# File 'lib/aws-embedded-metrics-customink/sinks/tcp.rb', line 17

def queue
  @queue
end

Instance Method Details

#accept(message) ⇒ Object



86
87
88
# File 'lib/aws-embedded-metrics-customink/sinks/tcp.rb', line 86

def accept(message)
  send_message("#{JSON.dump(message)}\n")
end

#connectionObject



65
66
67
68
# File 'lib/aws-embedded-metrics-customink/sinks/tcp.rb', line 65

def connection
  @conn = create_conn(@cw_agent_uri.host, @cw_agent_uri.port, @client_opts) if @conn.nil? || @conn.closed?
  @conn
end

#create_conn(host, port, opts) ⇒ Object



61
62
63
# File 'lib/aws-embedded-metrics-customink/sinks/tcp.rb', line 61

def create_conn(host, port, opts)
  TCPClient.open("#{host}:#{port}", opts)
end

#log_err(msg) ⇒ Object



57
58
59
# File 'lib/aws-embedded-metrics-customink/sinks/tcp.rb', line 57

def log_err(msg)
  @logger&.error(msg)
end

#log_warn(msg) ⇒ Object



53
54
55
# File 'lib/aws-embedded-metrics-customink/sinks/tcp.rb', line 53

def log_warn(msg)
  @logger&.warn(msg)
end

#send_message(message) ⇒ Object



70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
# File 'lib/aws-embedded-metrics-customink/sinks/tcp.rb', line 70

def send_message(message)
  retries = 2
  conn = nil
  begin
    conn = connection
    conn.write(message)
  rescue Errno::ECONNREFUSED
    conn.close unless conn.nil? || conn.closed?
    log_warn("Could not connect to CloudWatch Agent at #{@cw_agent_uri.scheme}://#{@cw_agent_uri.host}:#{@cw_agent_uri.port}")
    retries -= 1
    retry if retries >= 0
  rescue StandardError => e
    log_err("#{e.class}: #{e.message}: #{e.backtrace.join("\n")}")
  end
end