Class: Libhoney::TransmissionClient Private

Inherits:
Object
  • Object
show all
Defined in:
lib/libhoney/transmission.rb

This class is part of a private API. You should avoid using this class if possible, as it may be removed or be changed in the future.

Instance Method Summary collapse

Constructor Details

#initialize(max_batch_size: 50, send_frequency: 100, max_concurrent_batches: 10, pending_work_capacity: 1000, send_timeout: 10, responses: nil, block_on_send: false, block_on_responses: false, user_agent_addition: nil, proxy_config: nil) ⇒ TransmissionClient

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns a new instance of TransmissionClient.



8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# File 'lib/libhoney/transmission.rb', line 8

def initialize(max_batch_size: 50,
               send_frequency: 100,
               max_concurrent_batches: 10,
               pending_work_capacity: 1000,
               send_timeout: 10,
               responses: nil,
               block_on_send: false,
               block_on_responses: false,
               user_agent_addition: nil,
               proxy_config: nil)

  @responses              = responses || SizedQueue.new(pending_work_capacity * 2)
  @block_on_send          = block_on_send
  @block_on_responses     = block_on_responses
  @max_batch_size         = max_batch_size
  # convert to seconds
  @send_frequency         = send_frequency.fdiv(1000)
  @max_concurrent_batches = max_concurrent_batches
  @pending_work_capacity  = pending_work_capacity
  @send_timeout           = send_timeout
  @user_agent             = build_user_agent(user_agent_addition).freeze
  @proxy_config           = proxy_config

  @send_queue   = Queue.new
  @threads      = []
  @lock         = Mutex.new
  # use a SizedQueue so the producer will block on adding to the batch_queue when @block_on_send is true
  @batch_queue  = SizedQueue.new(@pending_work_capacity)
  @batch_thread = nil
end

Instance Method Details

#add(event) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Raises:

  • (ArgumentError)


39
40
41
42
43
44
45
46
47
48
49
50
51
# File 'lib/libhoney/transmission.rb', line 39

def add(event)
  raise ArgumentError, "No APIHost for Honeycomb. Can't send to the Great Unknown." if event.api_host == ''
  raise ArgumentError, "No WriteKey specified. Can't send event."                   if event.writekey == ''
  raise ArgumentError, "No Dataset for Honeycomb. Can't send datasetless."          if event.dataset  == ''

  begin
    @batch_queue.enq(event, !@block_on_send)
  rescue ThreadError
    # happens if the queue was full and block_on_send = false.
  end

  ensure_threads_running
end

#batch_loopObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
# File 'lib/libhoney/transmission.rb', line 130

def batch_loop
  next_send_time = Time.now + @send_frequency
  batched_events = Hash.new do |h, key|
    h[key] = []
  end

  loop do
    begin
      while (event = Timeout.timeout(@send_frequency) { @batch_queue.pop })
        key = [event.api_host, event.writekey, event.dataset]
        batched_events[key] << event
      end

      break
    rescue Exception
    ensure
      next_send_time = flush_batched_events(batched_events) if Time.now > next_send_time
    end
  end

  flush_batched_events(batched_events)
end

#close(drain) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
# File 'lib/libhoney/transmission.rb', line 109

def close(drain)
  # if drain is false, clear the remaining unprocessed events from the queue
  unless drain
    @batch_queue.clear
    @send_queue.clear
  end

  @batch_queue.enq(nil)
  @batch_thread.join

  # send @threads.length number of nils so each thread will fall out of send_loop
  @threads.length.times { @send_queue << nil }

  @threads.each(&:join)
  @threads = []

  enqueue_response(nil)

  0
end

#send_loopObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
# File 'lib/libhoney/transmission.rb', line 53

def send_loop
  http_clients = build_http_clients

  # eat events until we run out
  loop do
    api_host, writekey, dataset, batch = @send_queue.pop
    break if batch.nil?

    before = Time.now

    begin
      http = http_clients[api_host]
      body = serialize_batch(batch)
      next if body.nil?

      headers = {
        'Content-Type' => 'application/json',
        'X-Honeycomb-Team' => writekey
      }

      response = http.post(
        "/1/batch/#{Addressable::URI.escape(dataset)}",
        body: body,
        headers: headers
      )
      process_response(response, before, batch)
    rescue Exception => e
      # catch a broader swath of exceptions than is usually good practice,
      # because this is effectively the top-level exception handler for the
      # sender threads, and we don't want those threads to die (leaving
      # nothing consuming the queue).
      begin
        batch.each do |event|
          # nil events in the batch should already have had an error
          # response enqueued in #serialize_batch
          next if event.nil?

          Response.new(error: e).tap do |error_response|
            error_response. = event.
            enqueue_response(error_response)
          end
        end
      rescue ThreadError
      end
    end
  end
ensure
  http_clients.each do |_, http|
    begin
      http.close
    rescue StandardError
      nil
    end
  end
end