Class: PostHog::SendWorker

Inherits:
Object
  • Object
show all
Includes:
Defaults, Logging, Utils
Defined in:
lib/posthog/send_worker.rb

Constant Summary

Constants included from Defaults

Defaults::MAX_HASH_SIZE

Constants included from Utils

Utils::UTC_OFFSET_WITHOUT_COLON, Utils::UTC_OFFSET_WITH_COLON

Instance Method Summary collapse

Methods included from Logging

included, #logger

Methods included from Utils

convert_to_datetime, date_in_iso8601, datetime_in_iso8601, formatted_offset, get_by_symbol_or_string_key, is_valid_regex, isoify_dates, isoify_dates!, seconds_to_utc_offset, stringify_keys, symbolize_keys, symbolize_keys!, time_in_iso8601, uid

Constructor Details

#initialize(queue, api_key, options = {}) ⇒ SendWorker

public: Creates a new worker

The worker continuously takes messages off the queue and makes requests to the posthog.com api

queue - Queue synchronized between client and worker api_key - String of the project’s API key options - Hash of worker options

batch_size - Fixnum of how many items to send in a batch
on_error   - Proc of what to do on an error


25
26
27
28
29
30
31
32
33
34
# File 'lib/posthog/send_worker.rb', line 25

def initialize(queue, api_key, options = {})
  symbolize_keys! options
  @queue = queue
  @api_key = api_key
  @on_error = options[:on_error] || proc { |status, error| }
  batch_size = options[:batch_size] || Defaults::MessageBatch::MAX_SIZE
  @batch = MessageBatch.new(batch_size)
  @lock = Mutex.new
  @transport = Transport.new api_host: options[:host], skip_ssl_verification: options[:skip_ssl_verification]
end

Instance Method Details

#is_requesting?Boolean

public: Check whether we have outstanding requests.

TODO: Rename to ‘requesting?` in future version

Returns:

  • (Boolean)


58
59
60
# File 'lib/posthog/send_worker.rb', line 58

def is_requesting? # rubocop:disable Naming/PredicateName
  @lock.synchronize { !@batch.empty? }
end

#runObject

public: Continuously runs the loop to check for new events



38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# File 'lib/posthog/send_worker.rb', line 38

def run
  until Thread.current[:should_exit]
    return if @queue.empty?

    @lock.synchronize do
      consume_message_from_queue! until @batch.full? || @queue.empty?
    end

    res = @transport.send @api_key, @batch
    @on_error.call(res.status, res.error) unless res.status == 200

    @lock.synchronize { @batch.clear }
  end
ensure
  @transport.shutdown
end