Class: Datadog::Statsd::Forwarder

Inherits:
Object
  • Object
show all
Defined in:
lib/datadog/statsd/forwarder.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(connection_cfg: nil, buffer_max_payload_size: nil, buffer_max_pool_size: nil, buffer_overflowing_stategy: :drop, buffer_flush_interval: nil, sender_queue_size: nil, telemetry_flush_interval: nil, global_tags: [], single_thread: false, logger: nil) ⇒ Forwarder

Returns a new instance of Forwarder.


9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
# File 'lib/datadog/statsd/forwarder.rb', line 9

def initialize(
  connection_cfg: nil,

  buffer_max_payload_size: nil,
  buffer_max_pool_size: nil,
  buffer_overflowing_stategy: :drop,
  buffer_flush_interval: nil,

  sender_queue_size: nil,

  telemetry_flush_interval: nil,
  global_tags: [],

  single_thread: false,

  logger: nil
)
  @transport_type = connection_cfg.transport_type

  if telemetry_flush_interval
    @telemetry = Telemetry.new(telemetry_flush_interval,
      global_tags: global_tags,
      transport_type: @transport_type
    )
  end

  @connection = connection_cfg.make_connection(logger: logger, telemetry: telemetry)

  # Initialize buffer
  buffer_max_payload_size ||= (@transport_type == :udp ?
                               UDP_DEFAULT_BUFFER_SIZE : UDS_DEFAULT_BUFFER_SIZE)

  if buffer_max_payload_size <= 0
    raise ArgumentError, 'buffer_max_payload_size cannot be <= 0'
  end

  unless telemetry.nil? || telemetry.would_fit_in?(buffer_max_payload_size)
    raise ArgumentError, "buffer_max_payload_size is not high enough to use telemetry (tags=(#{global_tags.inspect}))"
  end

  buffer = MessageBuffer.new(@connection,
    max_payload_size: buffer_max_payload_size,
    max_pool_size: buffer_max_pool_size || DEFAULT_BUFFER_POOL_SIZE,
    overflowing_stategy: buffer_overflowing_stategy,
  )

  sender_queue_size ||= (@transport_type == :udp ?
                         UDP_DEFAULT_SENDER_QUEUE_SIZE : UDS_DEFAULT_SENDER_QUEUE_SIZE)

  @sender = single_thread ?
    SingleThreadSender.new(
      buffer,
      logger: logger,
      flush_interval: buffer_flush_interval) :
    Sender.new(
      buffer,
      logger: logger,
      flush_interval: buffer_flush_interval,
      telemetry: @telemetry,
      queue_size: sender_queue_size)
  @sender.start
end

Instance Attribute Details

#telemetryObject (readonly)

Returns the value of attribute telemetry.


6
7
8
# File 'lib/datadog/statsd/forwarder.rb', line 6

def telemetry
  @telemetry
end

#transport_typeObject (readonly)

Returns the value of attribute transport_type.


7
8
9
# File 'lib/datadog/statsd/forwarder.rb', line 7

def transport_type
  @transport_type
end

Instance Method Details

#closeObject


106
107
108
109
# File 'lib/datadog/statsd/forwarder.rb', line 106

def close
  sender.stop
  connection.close
end

#flush(flush_telemetry: false, sync: false) ⇒ Object


82
83
84
85
86
# File 'lib/datadog/statsd/forwarder.rb', line 82

def flush(flush_telemetry: false, sync: false)
  do_flush_telemetry if telemetry && flush_telemetry

  sender.flush(sync: sync)
end

#hostObject


88
89
90
91
92
# File 'lib/datadog/statsd/forwarder.rb', line 88

def host
  return nil unless transport_type == :udp

  connection.host
end

#portObject


94
95
96
97
98
# File 'lib/datadog/statsd/forwarder.rb', line 94

def port
  return nil unless transport_type == :udp

  connection.port
end

#send_message(message) ⇒ Object


72
73
74
75
76
# File 'lib/datadog/statsd/forwarder.rb', line 72

def send_message(message)
  sender.add(message)

  tick_telemetry
end

#socket_pathObject


100
101
102
103
104
# File 'lib/datadog/statsd/forwarder.rb', line 100

def socket_path
  return nil unless transport_type == :uds

  connection.socket_path
end

#sync_with_outbound_ioObject


78
79
80
# File 'lib/datadog/statsd/forwarder.rb', line 78

def sync_with_outbound_io
  sender.rendez_vous
end