Class: Skywalking::Reporter::BufferTrigger
- Inherits:
-
Object
- Object
- Skywalking::Reporter::BufferTrigger
- Extended by:
- Forwardable
- Includes:
- Enumerable
- Defined in:
- lib/skywalking/reporter/buffer_trigger.rb
Instance Method Summary collapse
- #<<(segment) ⇒ Object
- #clear_queue ⇒ Object
- #close_queue ⇒ Object
- #closed? ⇒ Boolean
- #empty? ⇒ Boolean
- #generate_segment(segment) ⇒ Object
-
#initialize(config) ⇒ BufferTrigger
constructor
A new instance of BufferTrigger.
- #stream_data ⇒ Object
Constructor Details
#initialize(config) ⇒ BufferTrigger
Returns a new instance of BufferTrigger.
26 27 28 29 30 31 32 |
# File 'lib/skywalking/reporter/buffer_trigger.rb', line 26 def initialize(config) @config = config @max_size = @config[:max_queue_size] @buffer = Queue.new @mutex = Mutex.new @closed = false end |
Instance Method Details
#<<(segment) ⇒ Object
38 39 40 41 42 43 |
# File 'lib/skywalking/reporter/buffer_trigger.rb', line 38 def <<(segment) @mutex.synchronize do clear_queue if @buffer.size >= @max_size @buffer.push(segment) end end |
#clear_queue ⇒ Object
45 46 47 |
# File 'lib/skywalking/reporter/buffer_trigger.rb', line 45 def clear_queue @buffer.clear end |
#close_queue ⇒ Object
49 50 51 52 53 54 |
# File 'lib/skywalking/reporter/buffer_trigger.rb', line 49 def close_queue @mutex.synchronize do @buffer.close @closed = true end end |
#closed? ⇒ Boolean
56 57 58 |
# File 'lib/skywalking/reporter/buffer_trigger.rb', line 56 def closed? @closed end |
#empty? ⇒ Boolean
34 35 36 |
# File 'lib/skywalking/reporter/buffer_trigger.rb', line 34 def empty? @buffer.empty? end |
#generate_segment(segment) ⇒ Object
72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 |
# File 'lib/skywalking/reporter/buffer_trigger.rb', line 72 def generate_segment(segment) SegmentObject.new( traceId: segment.[0].to_s, traceSegmentId: segment.segment_id.to_s, service: @config[:service_name], serviceInstance: @config[:instance_name], spans: segment.spans.map do |span| SpanObject.new( spanId: span.span_id, parentSpanId: span.parent_id, startTime: span.start_time, endTime: span.end_time, operationName: span.operation, peer: span.peer, spanType: span.kind, spanLayer: span.layer, componentId: span.component, refs: span.refs.select { |ref| ref.trace_id }.map do |ref| SegmentReference.new( refType: ref.ref_type == 'CrossProcess' ? 0 : 1, traceId: ref.trace_id, parentTraceSegmentId: ref.segment_id, parentSpanId: ref.span_id, parentService: ref.service, parentServiceInstance: ref.service_instance, parentEndpoint: ref.endpoint, networkAddressUsedAtPeer: ref.peer ) end, tags: span. ) end ) end |
#stream_data ⇒ Object
60 61 62 63 64 65 66 67 68 69 70 |
# File 'lib/skywalking/reporter/buffer_trigger.rb', line 60 def stream_data begin segment = @buffer.pop(false) rescue ThreadError return nil end Enumerator.new do |yie| yie << generate_segment(segment) unless segment.nil? end end |