Class: Datadog::Workers::AsyncTransport

Inherits:
Object
  • Object
show all
Defined in:
lib/ddtrace/workers.rb

Overview

Asynchronous worker that executes a Send() operation after given seconds. Under the hood, it uses Concurrent::TimerTask so that the thread will perform a task at regular intervals. The thread can be stopped with the stop() method and can start with the start() method.

Constant Summary collapse

DEFAULT_TIMEOUT =
5
BACK_OFF_RATIO =
1.2
BACK_OFF_MAX =
5

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(transport, buff_size, trace_task, service_task, interval) ⇒ AsyncTransport

Returns a new instance of AsyncTransport.



18
19
20
21
22
23
24
25
26
27
28
29
30
# File 'lib/ddtrace/workers.rb', line 18

def initialize(transport, buff_size, trace_task, service_task, interval)
  @trace_task = trace_task
  @service_task = service_task
  @flush_interval = interval
  @back_off = interval
  @trace_buffer = TraceBuffer.new(buff_size)
  @service_buffer = TraceBuffer.new(buff_size)
  @transport = transport
  @shutting_down = false

  @worker = nil
  @run = false
end

Instance Attribute Details

#service_bufferObject (readonly)

Returns the value of attribute service_buffer.



16
17
18
# File 'lib/ddtrace/workers.rb', line 16

def service_buffer
  @service_buffer
end

#shutting_downObject (readonly)

Returns the value of attribute shutting_down.



16
17
18
# File 'lib/ddtrace/workers.rb', line 16

def shutting_down
  @shutting_down
end

#trace_bufferObject (readonly)

Returns the value of attribute trace_buffer.



16
17
18
# File 'lib/ddtrace/workers.rb', line 16

def trace_buffer
  @trace_buffer
end

Instance Method Details

#callback_servicesObject

Callback function that process traces and executes the send_services() method.



49
50
51
52
53
54
55
56
57
58
59
60
61
# File 'lib/ddtrace/workers.rb', line 49

def callback_services
  return true if @service_buffer.empty?

  begin
    services = @service_buffer.pop()
    @service_task.call(services[0], @transport)
  rescue StandardError => e
    # ensures that the thread will not die because of an exception.
    # TODO[manu]: findout the reason and reschedule the send if it's not
    # a fatal exception
    Datadog::Tracer.log.error("Error during services flush: dropped #{items.length} items. Cause: #{e}")
  end
end

#callback_tracesObject

Callback function that process traces and executes the send_traces() method.



33
34
35
36
37
38
39
40
41
42
43
44
45
46
# File 'lib/ddtrace/workers.rb', line 33

def callback_traces
  return true if @trace_buffer.empty?

  begin
    traces = @trace_buffer.pop()
    traces = Pipeline.process!(traces)
    @trace_task.call(traces, @transport)
  rescue StandardError => e
    # ensures that the thread will not die because of an exception.
    # TODO[manu]: findout the reason and reschedule the send if it's not
    # a fatal exception
    Datadog::Tracer.log.error("Error during traces flush: dropped #{items.length} items. Cause: #{e}")
  end
end

#enqueue_service(service) ⇒ Object

Enqueue an item in the service internal buffer. This operation is thread-safe.



115
116
117
118
# File 'lib/ddtrace/workers.rb', line 115

def enqueue_service(service)
  return if service == {} # no use to send this, not worth it
  @service_buffer.push(service)
end

#enqueue_trace(trace) ⇒ Object

Enqueue an item in the trace internal buffer. This operation is thread-safe because uses the TraceBuffer data structure.



110
111
112
# File 'lib/ddtrace/workers.rb', line 110

def enqueue_trace(trace)
  @trace_buffer.push(trace)
end

#joinObject

Block until executor shutdown is complete or until timeout seconds have passed.



104
105
106
# File 'lib/ddtrace/workers.rb', line 104

def join
  @worker.join(5)
end

#shutdown!Object

Closes all available queues and waits for the trace and service buffer to flush



86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
# File 'lib/ddtrace/workers.rb', line 86

def shutdown!
  return false if @shutting_down
  @shutting_down = true
  @trace_buffer.close
  @service_buffer.close
  sleep(0.1)
  timeout_time = Time.now + DEFAULT_TIMEOUT
  while (!@trace_buffer.empty? || !@service_buffer.empty?) && Time.now <= timeout_time
    sleep(0.05)
    Datadog::Tracer.log.debug('Waiting for the buffers to clear before exiting')
  end
  stop
  join
  @shutting_down = false
  true
end

#startObject

Start the timer execution.



64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
# File 'lib/ddtrace/workers.rb', line 64

def start
  return if @run
  @run = true
  @worker = Thread.new() do
    Datadog::Tracer.log.debug("Starting thread in the process: #{Process.pid}")

    while @run
      @back_off = callback_traces ? @flush_interval : [@back_off * BACK_OFF_RATIO, BACK_OFF_MAX].min

      callback_services

      sleep(@back_off) if @run
    end
  end
end

#stopObject

Stop the timer execution. Tasks already in the queue will be executed.



81
82
83
# File 'lib/ddtrace/workers.rb', line 81

def stop
  @run = false
end