Class: Libhoney::TransmissionClient Private

Inherits:
Object
  • Object
show all
Defined in:
lib/libhoney/transmission.rb

This class is part of a private API. You should avoid using this class if possible, as it may be removed or be changed in the future.

Instance Method Summary collapse

Constructor Details

#initialize(max_batch_size: 0, send_frequency: 0, max_concurrent_batches: 0, pending_work_capacity: 0, responses: 0, block_on_send: 0, block_on_responses: 0) ⇒ TransmissionClient

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns a new instance of TransmissionClient.



6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# File 'lib/libhoney/transmission.rb', line 6

def initialize(max_batch_size: 0,
               send_frequency: 0,
               max_concurrent_batches: 0,
               pending_work_capacity: 0,
               responses: 0,
               block_on_send: 0,
               block_on_responses: 0)

  @responses = responses
  @block_on_send = block_on_send
  @block_on_responses = block_on_responses
  @max_batch_size = max_batch_size
  @send_frequency = send_frequency
  @max_concurrent_batches = max_concurrent_batches
  @pending_work_capacity = pending_work_capacity

  # use a SizedQueue so the producer will block on adding to the send_queue when @block_on_send is true
  @send_queue = SizedQueue.new(@pending_work_capacity)
  @threads = []
  @lock = Mutex.new
end

Instance Method Details

#add(event) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



28
29
30
31
32
33
34
35
36
37
38
39
40
41
# File 'lib/libhoney/transmission.rb', line 28

def add(event)
  begin
    @send_queue.enq(event, !@block_on_send)
  rescue ThreadError
    # happens if the queue was full and block_on_send = false.
  end

  @lock.synchronize {
    return if @threads.length > 0
    while @threads.length < @max_concurrent_batches
      @threads << Thread.new { self.send_loop }
    end
  }
end

#close(drain) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
# File 'lib/libhoney/transmission.rb', line 80

def close(drain)
  # if drain is false, clear the remaining unprocessed events from the queue
  @send_queue.clear if drain == false

  # send @threads.length number of nils so each thread will fall out of send_loop
  @threads.length.times { @send_queue << nil }

  @threads.each do |t|
    t.join
  end
  @threads = []

  @responses.enq(nil)

  0
end

#send_loopObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
# File 'lib/libhoney/transmission.rb', line 43

def send_loop
  # eat events until we run out
  loop {
    e = @send_queue.pop
    break if e == nil

    before = Time.now

    begin
      resp = HTTP.headers(
        'User-Agent' => "libhoney-rb/#{VERSION}",
        'Content-Type' => 'application/json',
        'X-Honeycomb-Team' => e.writekey,
        'X-Honeycomb-SampleRate' => e.sample_rate,
        'X-Event-Time' => e.timestamp.iso8601
      ).post(URI.join(e.api_host, '/1/events/', e.dataset), :json => e.data)

      response = Response.new(:status_code => resp.status)
    rescue Exception => error
      # catch a broader swath of exceptions than is usually good practice,
      # because this is effectively the top-level exception handler for the
      # sender threads, and we don't want those threads to die (leaving
      # nothing consuming the queue).
      response = Response.new(:error => error)
    ensure
      response.duration = Time.now - before
      response. = e.
    end

    begin
      @responses.enq(response, !@block_on_responses)
    rescue ThreadError
      # happens if the queue was full and block_on_send = false.
    end
  }
end