Class: Datadog::Tracing::Writer

Inherits:
Object
  • Object
show all
Defined in:
lib/datadog/tracing/writer.rb

Overview

Processor that sends traces and metadata to the agent

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ Writer

Returns a new instance of Writer.



18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# File 'lib/datadog/tracing/writer.rb', line 18

def initialize(options = {})
  # writer and transport parameters
  @buff_size = options.fetch(:buffer_size, Workers::AsyncTransport::DEFAULT_BUFFER_MAX_SIZE)
  @flush_interval = options.fetch(:flush_interval, Workers::AsyncTransport::DEFAULT_FLUSH_INTERVAL)
  transport_options = options.fetch(:transport_options, {})

  transport_options[:agent_settings] = options[:agent_settings] if options.key?(:agent_settings)

  # transport and buffers
  @transport = options.fetch(:transport) do
    Transport::HTTP.default(**transport_options)
  end

  @shutdown_timeout = options.fetch(:shutdown_timeout, Workers::AsyncTransport::DEFAULT_SHUTDOWN_TIMEOUT)

  # handles the thread creation after an eventual fork
  @mutex_after_fork = Mutex.new
  @pid = nil

  @traces_flushed = 0

  # one worker for traces
  @worker = nil

  # Once stopped, this writer instance cannot be restarted.
  # This allow for graceful shutdown, while preventing
  # the host application from inadvertently start new
  # threads during shutdown.
  @stopped = false

  # Callback handler
  @events = Events.new
end

Instance Attribute Details

#eventsObject (readonly)

Returns the value of attribute events.



13
14
15
# File 'lib/datadog/tracing/writer.rb', line 13

def events
  @events
end

#transportObject (readonly)

Returns the value of attribute transport.



13
14
15
# File 'lib/datadog/tracing/writer.rb', line 13

def transport
  @transport
end

#workerObject (readonly)

Returns the value of attribute worker.



13
14
15
# File 'lib/datadog/tracing/writer.rb', line 13

def worker
  @worker
end

Instance Method Details

#startObject

Explicitly starts the Datadog::Tracing::Writer's internal worker.

The Datadog::Tracing::Writer is also automatically started when necessary during calls to #write.



55
56
57
58
59
60
61
62
63
64
65
66
67
# File 'lib/datadog/tracing/writer.rb', line 55

def start
  @mutex_after_fork.synchronize do
    return false if @stopped

    pid = Process.pid
    return if @worker && pid == @pid

    @pid = pid

    start_worker
    true
  end
end

#statsObject

stats returns a dictionary of stats about the writer.



153
154
155
156
157
158
# File 'lib/datadog/tracing/writer.rb', line 153

def stats
  {
    traces_flushed: @traces_flushed,
    transport: @transport.stats
  }
end

#stopObject

Gracefully shuts down this writer.

Once stopped methods calls won't fail, but no internal work will be performed.

It is not possible to restart a stopped writer instance.



90
91
92
# File 'lib/datadog/tracing/writer.rb', line 90

def stop
  @mutex_after_fork.synchronize { stop_worker }
end

#write(trace) ⇒ Object

enqueue the trace for submission to the API



127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
# File 'lib/datadog/tracing/writer.rb', line 127

def write(trace)
  # In multiprocess environments, the main process initializes the +Writer+ instance and if
  # the process forks (i.e. a web server like Unicorn or Puma with multiple workers) the new
  # processes will share the same +Writer+ until the first write (COW). Because of that,
  # each process owns a different copy of the +@buffer+ after each write and so the
  # +AsyncTransport+ will not send data to the trace agent.
  #
  # This check ensures that if a process doesn't own the current +Writer+, async workers
  # will be initialized again (but only once for each process).
  start if @worker.nil? || @pid != Process.pid

  # TODO: Remove this, and have the tracer pump traces directly to runtime metrics
  #       instead of working through the trace writer.
  # Associate trace with runtime metrics
  Runtime::Metrics.associate_trace(trace)

  worker_local = @worker

  if worker_local
    worker_local.enqueue_trace(trace)
  elsif !@stopped
    Datadog.logger.debug('Writer either failed to start or was stopped before #write could complete')
  end
end