Class: Datadog::Statsd::Forwarder

Inherits:
Object
  • Object
show all
Defined in:
lib/datadog/statsd/forwarder.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(host: nil, port: nil, socket_path: nil, buffer_max_payload_size: nil, buffer_max_pool_size: nil, buffer_overflowing_stategy: :drop, telemetry_flush_interval: nil, global_tags: [], single_thread: false, logger: nil) ⇒ Forwarder

Returns a new instance of Forwarder.



9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# File 'lib/datadog/statsd/forwarder.rb', line 9

def initialize(
  host: nil,
  port: nil,
  socket_path: nil,

  buffer_max_payload_size: nil,
  buffer_max_pool_size: nil,
  buffer_overflowing_stategy: :drop,

  telemetry_flush_interval: nil,
  global_tags: [],

  single_thread: false,

  logger: nil
)
  @transport_type = socket_path.nil? ? :udp : :uds

  if telemetry_flush_interval
    @telemetry = Telemetry.new(telemetry_flush_interval,
      global_tags: global_tags,
      transport_type: transport_type
    )
  end

  @connection = case transport_type
                when :udp
                  UDPConnection.new(host, port, logger: logger, telemetry: telemetry)
                when :uds
                  UDSConnection.new(socket_path, logger: logger, telemetry: telemetry)
                end

  # Initialize buffer
  buffer_max_payload_size ||= (transport_type == :udp ? UDP_DEFAULT_BUFFER_SIZE : UDS_DEFAULT_BUFFER_SIZE)

  if buffer_max_payload_size <= 0
    raise ArgumentError, 'buffer_max_payload_size cannot be <= 0'
  end

  unless telemetry.nil? || telemetry.would_fit_in?(buffer_max_payload_size)
    raise ArgumentError, "buffer_max_payload_size is not high enough to use telemetry (tags=(#{global_tags.inspect}))"
  end

  buffer = MessageBuffer.new(@connection,
    max_payload_size: buffer_max_payload_size,
    max_pool_size: buffer_max_pool_size || DEFAULT_BUFFER_POOL_SIZE,
    overflowing_stategy: buffer_overflowing_stategy,
  )
  @sender = (single_thread ? SingleThreadSender : Sender).new(buffer, logger: logger)
  @sender.start
end

Instance Attribute Details

#telemetryObject (readonly)

Returns the value of attribute telemetry.



6
7
8
# File 'lib/datadog/statsd/forwarder.rb', line 6

def telemetry
  @telemetry
end

#transport_typeObject (readonly)

Returns the value of attribute transport_type.



7
8
9
# File 'lib/datadog/statsd/forwarder.rb', line 7

def transport_type
  @transport_type
end

Instance Method Details

#closeObject



95
96
97
98
# File 'lib/datadog/statsd/forwarder.rb', line 95

def close
  sender.stop
  connection.close
end

#flush(flush_telemetry: false, sync: false) ⇒ Object



71
72
73
74
75
# File 'lib/datadog/statsd/forwarder.rb', line 71

def flush(flush_telemetry: false, sync: false)
  do_flush_telemetry if telemetry && flush_telemetry

  sender.flush(sync: sync)
end

#hostObject



77
78
79
80
81
# File 'lib/datadog/statsd/forwarder.rb', line 77

def host
  return nil unless transport_type == :udp

  connection.host
end

#portObject



83
84
85
86
87
# File 'lib/datadog/statsd/forwarder.rb', line 83

def port
  return nil unless transport_type == :udp

  connection.port
end

#send_message(message) ⇒ Object



61
62
63
64
65
# File 'lib/datadog/statsd/forwarder.rb', line 61

def send_message(message)
  sender.add(message)

  tick_telemetry
end

#socket_pathObject



89
90
91
92
93
# File 'lib/datadog/statsd/forwarder.rb', line 89

def socket_path
  return nil unless transport_type == :uds

  connection.socket_path
end

#sync_with_outbound_ioObject



67
68
69
# File 'lib/datadog/statsd/forwarder.rb', line 67

def sync_with_outbound_io
  sender.rendez_vous
end