Class: Libhoney::TransmissionClient Private

Inherits:
Object
  • Object
show all
Includes:
Cleaner
Defined in:
lib/libhoney/transmission.rb

This class is part of a private API. You should avoid using this class if possible, as it may be removed or be changed in the future.

Direct Known Subclasses

ExperimentalTransmissionClient

Constant Summary

Constants included from Cleaner

Cleaner::ENCODING_OPTIONS, Cleaner::RAISED, Cleaner::RECURSION

Instance Method Summary collapse

Methods included from Cleaner

#clean_data, #clean_string

Constructor Details

#initialize(max_batch_size: 50, send_frequency: 100, max_concurrent_batches: 10, pending_work_capacity: 1000, send_timeout: 10, responses: nil, block_on_send: false, block_on_responses: false, user_agent_addition: nil, proxy_config: nil) ⇒ TransmissionClient

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns a new instance of TransmissionClient.



13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# File 'lib/libhoney/transmission.rb', line 13

def initialize(max_batch_size: 50,
               send_frequency: 100,
               max_concurrent_batches: 10,
               pending_work_capacity: 1000,
               send_timeout: 10,
               responses: nil,
               block_on_send: false,
               block_on_responses: false,
               user_agent_addition: nil,
               proxy_config: nil)

  @responses              = responses || SizedQueue.new(pending_work_capacity * 2)
  @block_on_send          = block_on_send
  @block_on_responses     = block_on_responses
  @max_batch_size         = max_batch_size
  # convert to seconds
  @send_frequency         = send_frequency.fdiv(1000)
  @max_concurrent_batches = max_concurrent_batches
  @pending_work_capacity  = pending_work_capacity
  @send_timeout           = send_timeout
  @user_agent             = build_user_agent(user_agent_addition).freeze
  @proxy_config           = proxy_config

  @send_queue   = Queue.new
  @threads      = []
  @lock         = Mutex.new
  @batch_thread = nil

  setup_batch_queue
end

Instance Method Details

#add(event) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



44
45
46
47
48
49
50
51
52
53
54
# File 'lib/libhoney/transmission.rb', line 44

def add(event)
  return unless event_valid(event)

  begin
    @batch_queue.enq(event, !@block_on_send)
  rescue ThreadError
    # happens if the queue was full and block_on_send = false.
  end

  ensure_threads_running
end

#batch_loopObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
# File 'lib/libhoney/transmission.rb', line 137

def batch_loop
  next_send_time = Time.now + @send_frequency
  batched_events = Hash.new do |h, key|
    h[key] = []
  end

  loop do
    begin
      # a timeout expiration waiting for an event
      #   1. interrupts only when thread is in a blocking state (waiting for pop)
      #   2. exception skips the break and is rescued
      #   3. triggers the ensure to flush the current batch
      #   3. begins the loop again with an updated next_send_time
      Thread.handle_interrupt(Timeout::Error => :on_blocking) do
        # an event on the batch_queue
        #   1. pops out and is truthy
        #   2. gets included in the current batch
        #   3. while waits for another event
        while (event = Timeout.timeout(@send_frequency) { @batch_queue.pop })
          key = [event.api_host, event.writekey, event.dataset]
          batched_events[key] << event
        end
      end

      # a nil on the batch_queue
      #   1. pops out and is falsy
      #   2. ends the event-popping while do..end
      #   3. breaks the loop
      #   4. flushes the current batch
      #   5. ends the batch_loop
      break
    rescue Timeout::Error
      # Timeout::Error happens when there is nothing to pop from the batch_queue.
      # We rescue it here to avoid spamming the logs with "execution expired" errors.
    rescue Exception => e
      warn "#{self.class.name}: 💥 " + e.message if %w[debug trace].include?(ENV['LOG_LEVEL'])
      warn e.backtrace.join("\n").to_s if ['trace'].include?(ENV['LOG_LEVEL'])

    # regardless of the exception, figure out whether enough time has passed to
    # send the current batched events, if so, send them and figure out the next send time
    # before going back to the top of the loop
    ensure
      next_send_time = flush_batched_events(batched_events) if Time.now > next_send_time
    end
  end

  # don't need to capture the next_send_time here because the batch_loop is exiting
  # for some reason (probably transmission.close)
  flush_batched_events(batched_events)
end

#close(drain) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
# File 'lib/libhoney/transmission.rb', line 111

def close(drain)
  @lock.synchronize do
    # if drain is false, clear the remaining unprocessed events from the queue
    if drain
      warn "#{self.class.name} - close: draining events" if %w[debug trace].include?(ENV['LOG_LEVEL'])
    else
      warn "#{self.class.name} - close: deleting unsent events" if %w[debug trace].include?(ENV['LOG_LEVEL'])
      @batch_queue.clear
      @send_queue.clear
    end

    @batch_queue.enq(nil)
    @batch_thread&.join(1.0) # limit the amount of time we'll wait for the thread to end

    # send @threads.length number of nils so each thread will fall out of send_loop
    @threads.length.times { @send_queue << nil }

    @threads.each(&:join)
    @threads = []
  end

  enqueue_response(nil)
  warn "#{self.class.name} - close: close complete" if %w[debug trace].include?(ENV['LOG_LEVEL'])
  0
end

#send_loopObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
# File 'lib/libhoney/transmission.rb', line 56

def send_loop
  http_clients = build_http_clients

  # eat events until we run out
  loop do
    api_host, writekey, dataset, batch = @send_queue.pop
    break if batch.nil?

    before = Time.now

    begin
      http = http_clients[api_host]
      body = serialize_batch(batch)

      next if body.nil?

      headers = {
        'Content-Type' => 'application/json',
        'X-Honeycomb-Team' => writekey
      }

      response = send_with_retry(http, dataset, body, headers)
      process_response(response, before, batch)
    rescue Exception => e
      # catch a broader swath of exceptions than is usually good practice,
      # because this is effectively the top-level exception handler for the
      # sender threads, and we don't want those threads to die (leaving
      # nothing consuming the queue).
      warn "#{self.class.name}: 💥 " + e.message if %w[debug trace].include?(ENV['LOG_LEVEL'])
      warn e.backtrace.join("\n").to_s if ['trace'].include?(ENV['LOG_LEVEL'])
      begin
        batch.each do |event|
          # nil events in the batch should already have had an error
          # response enqueued in #serialize_batch
          next if event.nil?

          Response.new(error: e).tap do |error_response|
            error_response. = event.
            enqueue_response(error_response)
          end
        end
      rescue ThreadError
      end
    end
  end
ensure
  http_clients.each do |_, http|
    begin
      http.close
    rescue StandardError
      nil
    end
  end
end