Class: Libhoney::TransmissionClient Private

Inherits:
Object
  • Object
show all
Defined in:
lib/libhoney/transmission.rb

This class is part of a private API. You should avoid using this class if possible, as it may be removed or be changed in the future.

Instance Method Summary collapse

Constructor Details

#initialize(max_batch_size: 0, send_frequency: 0, max_concurrent_batches: 0, pending_work_capacity: 0, responses: 0, block_on_send: 0, block_on_responses: 0) ⇒ TransmissionClient

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns a new instance of TransmissionClient.



6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# File 'lib/libhoney/transmission.rb', line 6

def initialize(max_batch_size: 0,
               send_frequency: 0,
               max_concurrent_batches: 0,
               pending_work_capacity: 0,
               responses: 0,
               block_on_send: 0,
               block_on_responses: 0)

  @responses = responses
  @block_on_send = block_on_send
  @block_on_responses = block_on_responses
  @max_batch_size = max_batch_size
  @send_frequency = send_frequency
  @max_concurrent_batches = max_concurrent_batches
  @pending_work_capacity = pending_work_capacity

  # use a SizedQueue so the producer will block on adding to the send_queue when @block_on_send is true
  @send_queue = SizedQueue.new(@pending_work_capacity)
  @threads = []
  @lock = Mutex.new
end

Instance Method Details

#add(event) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



28
29
30
31
32
33
34
35
36
37
38
39
40
41
# File 'lib/libhoney/transmission.rb', line 28

def add(event)
  begin
    @send_queue.enq(event, !@block_on_send)
  rescue ThreadError
    # happens if the queue was full and block_on_send = false.
  end

  @lock.synchronize {
    return if @threads.length > 0
    while @threads.length < @max_concurrent_batches
      @threads << Thread.new { self.send_loop }
    end
  }
end

#close(drain) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
# File 'lib/libhoney/transmission.rb', line 69

def close(drain)
  # if drain is false, clear the remaining unprocessed events from the queue
  @send_queue.clear if drain == false

  # send @threads.length number of nils so each thread will fall out of send_loop
  @threads.length.times { @send_queue << nil }

  @threads.each do |t|
    t.join
  end
  @threads = []

  @responses.enq(nil)

  0
end

#send_loopObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
# File 'lib/libhoney/transmission.rb', line 43

def send_loop
  # eat events until we run out
  loop {
    e = @send_queue.pop
    break if e == nil

    before = Time.now
    resp = HTTP.headers('User-Agent' => "libhoney-rb/#{VERSION}",
                        'Content-Type' => 'application/json',
                        'X-Honeycomb-Team' => e.writekey,
                        'X-Honeycomb-SampleRate' => e.sample_rate,
                        'X-Event-Time' => e.timestamp.iso8601)
           .post(URI.join(e.api_host, '/1/events/', e.dataset), :json => e.data)
    after = Time.now

    response = Response.new(:duration => after - before,
                            :status_code => resp.status,
                            :metadata => e.)
    begin
      @responses.enq(response, !@block_on_responses)
    rescue ThreadError
      # happens if the queue was full and block_on_send = false.
    end
  }
end