Class: Datadog::Writer

Inherits:
Object
  • Object
show all
Defined in:
lib/ddtrace/writer.rb

Overview

Processor that sends traces and metadata to the agent

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ Writer

Returns a new instance of Writer.



20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
# File 'lib/ddtrace/writer.rb', line 20

def initialize(options = {})
  # writer and transport parameters
  @buff_size = options.fetch(:buffer_size, Workers::AsyncTransport::DEFAULT_BUFFER_MAX_SIZE)
  @flush_interval = options.fetch(:flush_interval, Workers::AsyncTransport::DEFAULT_FLUSH_INTERVAL)
  transport_options = options.fetch(:transport_options, {})

  # priority sampling
  if options[:priority_sampler]
    @priority_sampler = options[:priority_sampler]
    transport_options[:api_version] ||= Transport::HTTP::API::V4
  end

  # transport and buffers
  @transport = options.fetch(:transport) do
    Transport::HTTP.default(transport_options)
  end

  # handles the thread creation after an eventual fork
  @mutex_after_fork = Mutex.new
  @pid = nil

  @traces_flushed = 0

  # one worker for traces
  @worker = nil
end

Instance Attribute Details

#priority_samplerObject (readonly)

Returns the value of attribute priority_sampler.



15
16
17
# File 'lib/ddtrace/writer.rb', line 15

def priority_sampler
  @priority_sampler
end

#transportObject (readonly)

Returns the value of attribute transport.



15
16
17
# File 'lib/ddtrace/writer.rb', line 15

def transport
  @transport
end

#workerObject (readonly)

Returns the value of attribute worker.



15
16
17
# File 'lib/ddtrace/writer.rb', line 15

def worker
  @worker
end

Instance Method Details

#send_spans(traces, transport) ⇒ Object

flush spans to the trace-agent, handles spans only



84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
# File 'lib/ddtrace/writer.rb', line 84

def send_spans(traces, transport)
  return true if traces.empty?

  # Inject hostname if configured to do so
  inject_hostname!(traces) if Datadog.configuration.report_hostname

  # Send traces and get responses
  responses = transport.send_traces(traces)

  # Tally up successful flushes
  responses.reject { |x| x.internal_error? || x.server_error? }.each do |response|
    @traces_flushed += response.trace_count
  end

  # Update priority sampler
  update_priority_sampler(responses.last)

  record_environment_information!(responses)

  # Return if server error occurred.
  !responses.find(&:server_error?)
end

#startObject



47
48
49
50
51
52
53
54
55
# File 'lib/ddtrace/writer.rb', line 47

def start
  @mutex_after_fork.synchronize do
    pid = Process.pid
    return if @worker && pid == @pid
    @pid = pid
    start_worker
    true
  end
end

#statsObject

stats returns a dictionary of stats about the writer.



145
146
147
148
149
150
# File 'lib/ddtrace/writer.rb', line 145

def stats
  {
    traces_flushed: @traces_flushed,
    transport: @transport.stats
  }
end

#stopObject



70
71
72
# File 'lib/ddtrace/writer.rb', line 70

def stop
  @mutex_after_fork.synchronize { stop_worker }
end

#write(trace, services = nil) ⇒ Object

enqueue the trace for submission to the API



108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
# File 'lib/ddtrace/writer.rb', line 108

def write(trace, services = nil)
  unless services.nil?
    Datadog::Patcher.do_once('Writer#write') do
      Datadog.logger.warn(%(
        write: Writing services has been deprecated and no longer need to be provided.
        write(traces, services) can be updated to write(traces)
      ))
    end
  end

  # In multiprocess environments, the main process initializes the +Writer+ instance and if
  # the process forks (i.e. a web server like Unicorn or Puma with multiple workers) the new
  # processes will share the same +Writer+ until the first write (COW). Because of that,
  # each process owns a different copy of the +@buffer+ after each write and so the
  # +AsyncTransport+ will not send data to the trace agent.
  #
  # This check ensures that if a process doesn't own the current +Writer+, async workers
  # will be initialized again (but only once for each process).
  start if @worker.nil? || @pid != Process.pid

  # TODO: Remove this, and have the tracer pump traces directly to runtime metrics
  #       instead of working through the trace writer.
  # Associate root span with runtime metrics
  if Datadog.configuration.runtime_metrics.enabled && !trace.empty?
    Datadog.runtime_metrics.associate_with_span(trace.first)
  end

  worker_local = @worker

  if worker_local
    worker_local.enqueue_trace(trace)
  else
    Datadog.logger.debug('Writer either failed to start or was stopped before #write could complete')
  end
end