Class: Libhoney::TransmissionClient Private
- Inherits:
-
Object
- Object
- Libhoney::TransmissionClient
- Defined in:
- lib/libhoney/transmission.rb
This class is part of a private API. You should avoid using this class if possible, as it may be removed or be changed in the future.
Instance Method Summary collapse
- #add(event) ⇒ Object private
- #close(drain) ⇒ Object private
-
#initialize(max_batch_size: 0, send_frequency: 0, max_concurrent_batches: 0, pending_work_capacity: 0, responses: 0, block_on_send: 0, block_on_responses: 0) ⇒ TransmissionClient
constructor
private
A new instance of TransmissionClient.
- #send_loop ⇒ Object private
Constructor Details
#initialize(max_batch_size: 0, send_frequency: 0, max_concurrent_batches: 0, pending_work_capacity: 0, responses: 0, block_on_send: 0, block_on_responses: 0) ⇒ TransmissionClient
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Returns a new instance of TransmissionClient.
6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
# File 'lib/libhoney/transmission.rb', line 6 def initialize(max_batch_size: 0, send_frequency: 0, max_concurrent_batches: 0, pending_work_capacity: 0, responses: 0, block_on_send: 0, block_on_responses: 0) @responses = responses @block_on_send = block_on_send @block_on_responses = block_on_responses @max_batch_size = max_batch_size @send_frequency = send_frequency @max_concurrent_batches = max_concurrent_batches @pending_work_capacity = pending_work_capacity # use a SizedQueue so the producer will block on adding to the send_queue when @block_on_send is true @send_queue = SizedQueue.new(@pending_work_capacity) @threads = [] @lock = Mutex.new end |
Instance Method Details
#add(event) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
28 29 30 31 32 33 34 35 36 37 38 39 40 41 |
# File 'lib/libhoney/transmission.rb', line 28 def add(event) begin @send_queue.enq(event, !@block_on_send) rescue ThreadError # happens if the queue was full and block_on_send = false. end @lock.synchronize { return if @threads.length > 0 while @threads.length < @max_concurrent_batches @threads << Thread.new { self.send_loop } end } end |
#close(drain) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 |
# File 'lib/libhoney/transmission.rb', line 69 def close(drain) # if drain is false, clear the remaining unprocessed events from the queue @send_queue.clear if drain == false # send @threads.length number of nils so each thread will fall out of send_loop @threads.length.times { @send_queue << nil } @threads.each do |t| t.join end @threads = [] @responses.enq(nil) 0 end |
#send_loop ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 |
# File 'lib/libhoney/transmission.rb', line 43 def send_loop # eat events until we run out loop { e = @send_queue.pop break if e == nil before = Time.now resp = HTTP.headers('User-Agent' => "libhoney-rb/#{VERSION}", 'Content-Type' => 'application/json', 'X-Honeycomb-Team' => e.writekey, 'X-Honeycomb-SampleRate' => e.sample_rate, 'X-Event-Time' => e..iso8601) .post(URI.join(e.api_host, '/1/events/', e.dataset), :json => e.data) after = Time.now response = Response.new(:duration => after - before, :status_code => resp.status, :metadata => e.) begin @responses.enq(response, !@block_on_responses) rescue ThreadError # happens if the queue was full and block_on_send = false. end } end |