Class: Libhoney::TransmissionClient Private

Inherits:
Object
  • Object
show all
Includes:
Cleaner
Defined in:
lib/libhoney/transmission.rb

This class is part of a private API. You should avoid using this class if possible, as it may be removed or be changed in the future.

Constant Summary

Constants included from Cleaner

Cleaner::ENCODING_OPTIONS, Cleaner::RAISED, Cleaner::RECURSION

Instance Method Summary collapse

Methods included from Cleaner

#clean_data, #clean_string

Constructor Details

#initialize(max_batch_size: 50, send_frequency: 100, max_concurrent_batches: 10, pending_work_capacity: 1000, send_timeout: 10, responses: nil, block_on_send: false, block_on_responses: false, user_agent_addition: nil, proxy_config: nil) ⇒ TransmissionClient

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns a new instance of TransmissionClient.



11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# File 'lib/libhoney/transmission.rb', line 11

def initialize(max_batch_size: 50,
               send_frequency: 100,
               max_concurrent_batches: 10,
               pending_work_capacity: 1000,
               send_timeout: 10,
               responses: nil,
               block_on_send: false,
               block_on_responses: false,
               user_agent_addition: nil,
               proxy_config: nil)

  @responses              = responses || SizedQueue.new(pending_work_capacity * 2)
  @block_on_send          = block_on_send
  @block_on_responses     = block_on_responses
  @max_batch_size         = max_batch_size
  # convert to seconds
  @send_frequency         = send_frequency.fdiv(1000)
  @max_concurrent_batches = max_concurrent_batches
  @pending_work_capacity  = pending_work_capacity
  @send_timeout           = send_timeout
  @user_agent             = build_user_agent(user_agent_addition).freeze
  @proxy_config           = proxy_config

  @send_queue   = Queue.new
  @threads      = []
  @lock         = Mutex.new
  # use a SizedQueue so the producer will block on adding to the batch_queue when @block_on_send is true
  @batch_queue  = SizedQueue.new(@pending_work_capacity)
  @batch_thread = nil
end

Instance Method Details

#add(event) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



42
43
44
45
46
47
48
49
50
51
52
# File 'lib/libhoney/transmission.rb', line 42

def add(event)
  return unless event_valid(event)

  begin
    @batch_queue.enq(event, !@block_on_send)
  rescue ThreadError
    # happens if the queue was full and block_on_send = false.
  end

  ensure_threads_running
end

#batch_loopObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
# File 'lib/libhoney/transmission.rb', line 152

def batch_loop
  next_send_time = Time.now + @send_frequency
  batched_events = Hash.new do |h, key|
    h[key] = []
  end

  loop do
    begin
      Thread.handle_interrupt(Timeout::Error => :on_blocking) do
        while (event = Timeout.timeout(@send_frequency) { @batch_queue.pop })
          key = [event.api_host, event.writekey, event.dataset]
          batched_events[key] << event
        end
      end

      break
    rescue Exception
    ensure
      next_send_time = flush_batched_events(batched_events) if Time.now > next_send_time
    end
  end

  flush_batched_events(batched_events)
end

#close(drain) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
# File 'lib/libhoney/transmission.rb', line 131

def close(drain)
  # if drain is false, clear the remaining unprocessed events from the queue
  unless drain
    @batch_queue.clear
    @send_queue.clear
  end

  @batch_queue.enq(nil)
  @batch_thread.join

  # send @threads.length number of nils so each thread will fall out of send_loop
  @threads.length.times { @send_queue << nil }

  @threads.each(&:join)
  @threads = []

  enqueue_response(nil)

  0
end

#event_valid(event) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
# File 'lib/libhoney/transmission.rb', line 54

def event_valid(event)
  invalid = []
  invalid.push('api host') if event.api_host.nil? || event.api_host.empty?
  invalid.push('write key') if event.writekey.nil? || event.writekey.empty?
  invalid.push('dataset') if event.dataset.nil? || event.dataset.empty?

  unless invalid.empty?
    e = StandardError.new("#{self.class.name}: nil or empty required fields (#{invalid.join(', ')})"\
      '. Will not attempt to send.')
    Response.new(error: e).tap do |error_response|
      error_response. = event.
      enqueue_response(error_response)
    end

    return false
  end

  true
end

#send_loopObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
# File 'lib/libhoney/transmission.rb', line 74

def send_loop
  http_clients = build_http_clients

  # eat events until we run out
  loop do
    api_host, writekey, dataset, batch = @send_queue.pop
    break if batch.nil?

    before = Time.now

    begin
      http = http_clients[api_host]
      body = serialize_batch(batch)

      next if body.nil?

      headers = {
        'Content-Type' => 'application/json',
        'X-Honeycomb-Team' => writekey
      }

      response = http.post(
        "/1/batch/#{Addressable::URI.escape(dataset)}",
        body: body,
        headers: headers
      )
      process_response(response, before, batch)
    rescue Exception => e
      # catch a broader swath of exceptions than is usually good practice,
      # because this is effectively the top-level exception handler for the
      # sender threads, and we don't want those threads to die (leaving
      # nothing consuming the queue).
      begin
        batch.each do |event|
          # nil events in the batch should already have had an error
          # response enqueued in #serialize_batch
          next if event.nil?

          Response.new(error: e).tap do |error_response|
            error_response. = event.
            enqueue_response(error_response)
          end
        end
      rescue ThreadError
      end
    end
  end
ensure
  http_clients.each do |_, http|
    begin
      http.close
    rescue StandardError
      nil
    end
  end
end