Class: Datadog::Workers::AsyncTransport

Inherits:
Object
  • Object
show all
Defined in:
lib/ddtrace/workers.rb

Overview

Asynchronous worker that executes a Send() operation after given seconds. Under the hood, it uses Concurrent::TimerTask so that the thread will perform a task at regular intervals. The thread can be stopped with the stop() method and can start with the start() method.

Instance Method Summary collapse

Constructor Details

#initialize(span_interval, service_interval, transport, buff_size, trace_task, service_task) ⇒ AsyncTransport

Returns a new instance of AsyncTransport.



12
13
14
15
16
17
18
19
20
21
22
23
# File 'lib/ddtrace/workers.rb', line 12

def initialize(span_interval, service_interval, transport, buff_size, trace_task, service_task)
  @trace_task = trace_task
  @service_task = service_task
  @span_interval = span_interval
  @service_interval = service_interval
  @trace_buffer = TraceBuffer.new(buff_size)
  @service_buffer = TraceBuffer.new(buff_size)
  @transport = transport

  @worker = nil
  @run = false
end

Instance Method Details

#callback_servicesObject

Callback function that process traces and executes the send_services() method.



41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
# File 'lib/ddtrace/workers.rb', line 41

def callback_services
  return if @service_buffer.empty?

  begin
    services = @service_buffer.pop()
    # pick up the latest services hash (this is a FIFO list)
    # that is different from what we sent before.
    different = services.inject(false) { |acc, elem| elem != @last_flushed_services ? elem : acc }
    if different
      if @service_task.call(different, @transport)
        @last_flushed_services = different.clone
      end
    else
      Datadog::Tracer.log.debug('No new different services, skipping flush.')
    end
  rescue StandardError => e
    # ensures that the thread will not die because of an exception.
    # TODO[manu]: findout the reason and reschedule the send if it's not
    # a fatal exception
    Datadog::Tracer.log.error("Error during services flush: dropped #{items.length} items. Cause: #{e}")
  end
end

#callback_tracesObject

Callback function that process traces and executes the send_traces() method.



26
27
28
29
30
31
32
33
34
35
36
37
38
# File 'lib/ddtrace/workers.rb', line 26

def callback_traces
  return if @trace_buffer.empty?

  begin
    traces = @trace_buffer.pop()
    @trace_task.call(traces, @transport)
  rescue StandardError => e
    # ensures that the thread will not die because of an exception.
    # TODO[manu]: findout the reason and reschedule the send if it's not
    # a fatal exception
    Datadog::Tracer.log.error("Error during traces flush: dropped #{items.length} items. Cause: #{e}")
  end
end

#enqueue_service(service) ⇒ Object

Enqueue an item in the service internal buffer. This operation is thread-safe.



102
103
104
105
# File 'lib/ddtrace/workers.rb', line 102

def enqueue_service(service)
  return if service == {} # no use to send this, not worth it
  @service_buffer.push(service)
end

#enqueue_trace(trace) ⇒ Object

Enqueue an item in the trace internal buffer. This operation is thread-safe because uses the TraceBuffer data structure.



97
98
99
# File 'lib/ddtrace/workers.rb', line 97

def enqueue_trace(trace)
  @trace_buffer.push(trace)
end

#joinObject

Block until executor shutdown is complete or until timeout seconds have passed.



91
92
93
# File 'lib/ddtrace/workers.rb', line 91

def join
  @worker.join(10)
end

#startObject

Start the timer execution.



65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
# File 'lib/ddtrace/workers.rb', line 65

def start
  return if @run
  @run = true
  @worker = Thread.new() do
    Datadog::Tracer.log.debug("Starting thread in the process: #{Process.pid}")
    @last_flushed_services = nil
    next_send_services = Time.now

    # this loop assumes spans are flushed more often than services
    while @run
      callback_traces
      if Time.now >= next_send_services
        next_send_services = Time.now + @service_interval
        callback_services
      end
      sleep(@span_interval)
    end
  end
end

#stopObject

Stop the timer execution. Tasks already in the queue will be executed.



86
87
88
# File 'lib/ddtrace/workers.rb', line 86

def stop
  @run = false
end