Class: Datadog::Writer

Inherits:
Object
  • Object
show all
Defined in:
lib/ddtrace/writer.rb

Overview

Traces and services writer that periodically sends data to the trace-agent

Constant Summary collapse

HOSTNAME =
'localhost'.freeze
PORT =
'8126'.freeze

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ Writer

Returns a new instance of Writer.



13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# File 'lib/ddtrace/writer.rb', line 13

def initialize(options = {})
  # writer and transport parameters
  @buff_size = options.fetch(:buffer_size, 100)
  @flush_interval = options.fetch(:flush_interval, 1)
  transport_options = options.fetch(:transport_options, {})

  # priority sampling
  if options[:priority_sampler]
    @priority_sampler = options[:priority_sampler]
    transport_options[:api_version] ||= HTTPTransport::V4
    transport_options[:response_callback] ||= method(:sampling_updater)
  end

  # transport and buffers
  @transport = options.fetch(:transport) do
    HTTPTransport.new(HOSTNAME, PORT, transport_options)
  end

  @services = {}

  # handles the thread creation after an eventual fork
  @mutex_after_fork = Mutex.new
  @pid = nil

  @traces_flushed = 0
  @services_flushed = 0

  # one worker for both services and traces, each have their own queues
  @worker = nil
end

Instance Attribute Details

#priority_samplerObject (readonly)

Returns the value of attribute priority_sampler.



8
9
10
# File 'lib/ddtrace/writer.rb', line 8

def priority_sampler
  @priority_sampler
end

#transportObject (readonly)

Returns the value of attribute transport.



8
9
10
# File 'lib/ddtrace/writer.rb', line 8

def transport
  @transport
end

#workerObject (readonly)

Returns the value of attribute worker.



8
9
10
# File 'lib/ddtrace/writer.rb', line 8

def worker
  @worker
end

Instance Method Details

#send_services(services, transport) ⇒ Object

flush services to the trace-agent, handles services only



77
78
79
80
81
82
83
84
85
# File 'lib/ddtrace/writer.rb', line 77

def send_services(services, transport)
  return true if services.empty?

  code = transport.send(:services, services)
  status = !transport.server_error?(code)
  @services_flushed += 1 if status

  status
end

#send_spans(traces, transport) ⇒ Object

flush spans to the trace-agent, handles spans only



66
67
68
69
70
71
72
73
74
# File 'lib/ddtrace/writer.rb', line 66

def send_spans(traces, transport)
  return true if traces.empty?

  code = transport.send(:traces, traces)
  status = !transport.server_error?(code)
  @traces_flushed += traces.length if status

  status
end

#startObject

spawns two different workers for spans and services; they share the same transport which is thread-safe



46
47
48
49
50
51
52
53
54
55
56
57
# File 'lib/ddtrace/writer.rb', line 46

def start
  @pid = Process.pid
  @trace_handler = ->(items, transport) { send_spans(items, transport) }
  @service_handler = ->(items, transport) { send_services(items, transport) }
  @worker = Datadog::Workers::AsyncTransport.new(@transport,
                                                 @buff_size,
                                                 @trace_handler,
                                                 @service_handler,
                                                 @flush_interval)

  @worker.start()
end

#statsObject

stats returns a dictionary of stats about the writer.



110
111
112
113
114
115
116
# File 'lib/ddtrace/writer.rb', line 110

def stats
  {
    traces_flushed: @traces_flushed,
    services_flushed: @services_flushed,
    transport: @transport.stats
  }
end

#stopObject

stops both workers for spans and services.



60
61
62
63
# File 'lib/ddtrace/writer.rb', line 60

def stop
  @worker.stop()
  @worker = nil
end

#write(trace, services) ⇒ Object

enqueue the trace for submission to the API



88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
# File 'lib/ddtrace/writer.rb', line 88

def write(trace, services)
  # In multiprocess environments, the main process initializes the +Writer+ instance and if
  # the process forks (i.e. a web server like Unicorn or Puma with multiple workers) the new
  # processes will share the same +Writer+ until the first write (COW). Because of that,
  # each process owns a different copy of the +@buffer+ after each write and so the
  # +AsyncTransport+ will not send data to the trace agent.
  #
  # This check ensures that if a process doesn't own the current +Writer+, async workers
  # will be initialized again (but only once for each process).
  pid = Process.pid
  @mutex_after_fork.synchronize do
    if pid != @pid
      # we should start threads because the worker doesn't own this
      start()
    end
  end

  @worker.enqueue_trace(trace)
  @worker.enqueue_service(services)
end