Class: Datadog::Workers::AsyncTransport
- Inherits:
-
Object
- Object
- Datadog::Workers::AsyncTransport
- Defined in:
- lib/ddtrace/workers.rb
Overview
Asynchronous worker that executes a Send() operation after given seconds. Under the hood, it uses Concurrent::TimerTask so that the thread will perform a task at regular intervals. The thread can be stopped with the stop() method and can start with the start() method.
Constant Summary collapse
- DEFAULT_TIMEOUT =
5- BACK_OFF_RATIO =
1.2- BACK_OFF_MAX =
5
Instance Attribute Summary collapse
-
#service_buffer ⇒ Object
readonly
Returns the value of attribute service_buffer.
-
#shutting_down ⇒ Object
readonly
Returns the value of attribute shutting_down.
-
#trace_buffer ⇒ Object
readonly
Returns the value of attribute trace_buffer.
Instance Method Summary collapse
-
#callback_services ⇒ Object
Callback function that process traces and executes the send_services() method.
-
#callback_traces ⇒ Object
Callback function that process traces and executes the send_traces() method.
-
#enqueue_service(service) ⇒ Object
Enqueue an item in the service internal buffer.
-
#enqueue_trace(trace) ⇒ Object
Enqueue an item in the trace internal buffer.
-
#initialize(transport, buff_size, trace_task, service_task, interval) ⇒ AsyncTransport
constructor
A new instance of AsyncTransport.
-
#join ⇒ Object
Block until executor shutdown is complete or until timeout seconds have passed.
-
#shutdown! ⇒ Object
Closes all available queues and waits for the trace and service buffer to flush.
-
#start ⇒ Object
Start the timer execution.
-
#stop ⇒ Object
Stop the timer execution.
Constructor Details
#initialize(transport, buff_size, trace_task, service_task, interval) ⇒ AsyncTransport
Returns a new instance of AsyncTransport.
18 19 20 21 22 23 24 25 26 27 28 29 30 |
# File 'lib/ddtrace/workers.rb', line 18 def initialize(transport, buff_size, trace_task, service_task, interval) @trace_task = trace_task @service_task = service_task @flush_interval = interval @back_off = interval @trace_buffer = TraceBuffer.new(buff_size) @service_buffer = TraceBuffer.new(buff_size) @transport = transport @shutting_down = false @worker = nil @run = false end |
Instance Attribute Details
#service_buffer ⇒ Object (readonly)
Returns the value of attribute service_buffer.
16 17 18 |
# File 'lib/ddtrace/workers.rb', line 16 def service_buffer @service_buffer end |
#shutting_down ⇒ Object (readonly)
Returns the value of attribute shutting_down.
16 17 18 |
# File 'lib/ddtrace/workers.rb', line 16 def shutting_down @shutting_down end |
#trace_buffer ⇒ Object (readonly)
Returns the value of attribute trace_buffer.
16 17 18 |
# File 'lib/ddtrace/workers.rb', line 16 def trace_buffer @trace_buffer end |
Instance Method Details
#callback_services ⇒ Object
Callback function that process traces and executes the send_services() method.
49 50 51 52 53 54 55 56 57 58 59 60 61 |
# File 'lib/ddtrace/workers.rb', line 49 def callback_services return true if @service_buffer.empty? begin services = @service_buffer.pop() @service_task.call(services[0], @transport) rescue StandardError => e # ensures that the thread will not die because of an exception. # TODO[manu]: findout the reason and reschedule the send if it's not # a fatal exception Datadog::Tracer.log.error("Error during services flush: dropped #{items.length} items. Cause: #{e}") end end |
#callback_traces ⇒ Object
Callback function that process traces and executes the send_traces() method.
33 34 35 36 37 38 39 40 41 42 43 44 45 46 |
# File 'lib/ddtrace/workers.rb', line 33 def callback_traces return true if @trace_buffer.empty? begin traces = @trace_buffer.pop() traces = Pipeline.process!(traces) @trace_task.call(traces, @transport) rescue StandardError => e # ensures that the thread will not die because of an exception. # TODO[manu]: findout the reason and reschedule the send if it's not # a fatal exception Datadog::Tracer.log.error("Error during traces flush: dropped #{items.length} items. Cause: #{e}") end end |
#enqueue_service(service) ⇒ Object
Enqueue an item in the service internal buffer. This operation is thread-safe.
115 116 117 118 |
# File 'lib/ddtrace/workers.rb', line 115 def enqueue_service(service) return if service == {} # no use to send this, not worth it @service_buffer.push(service) end |
#enqueue_trace(trace) ⇒ Object
Enqueue an item in the trace internal buffer. This operation is thread-safe because uses the TraceBuffer data structure.
110 111 112 |
# File 'lib/ddtrace/workers.rb', line 110 def enqueue_trace(trace) @trace_buffer.push(trace) end |
#join ⇒ Object
Block until executor shutdown is complete or until timeout seconds have passed.
104 105 106 |
# File 'lib/ddtrace/workers.rb', line 104 def join @worker.join(5) end |
#shutdown! ⇒ Object
Closes all available queues and waits for the trace and service buffer to flush
86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 |
# File 'lib/ddtrace/workers.rb', line 86 def shutdown! return false if @shutting_down @shutting_down = true @trace_buffer.close @service_buffer.close sleep(0.1) timeout_time = Time.now + DEFAULT_TIMEOUT while (!@trace_buffer.empty? || !@service_buffer.empty?) && Time.now <= timeout_time sleep(0.05) Datadog::Tracer.log.debug('Waiting for the buffers to clear before exiting') end stop join @shutting_down = false true end |
#start ⇒ Object
Start the timer execution.
64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 |
# File 'lib/ddtrace/workers.rb', line 64 def start return if @run @run = true @worker = Thread.new() do Datadog::Tracer.log.debug("Starting thread in the process: #{Process.pid}") while @run @back_off = callback_traces ? @flush_interval : [@back_off * BACK_OFF_RATIO, BACK_OFF_MAX].min callback_services sleep(@back_off) if @run end end end |
#stop ⇒ Object
Stop the timer execution. Tasks already in the queue will be executed.
81 82 83 |
# File 'lib/ddtrace/workers.rb', line 81 def stop @run = false end |