Class: AnalyticsRuby::Consumer

Inherits:
Object
  • Object
show all
Defined in:
lib/analytics-ruby/consumer.rb

Instance Method Summary collapse

Constructor Details

#initialize(queue, secret, options = {}) ⇒ Consumer

public: Creates a new consumer

The consumer continuously takes messages off the queue and makes requests to the segment.io api

queue - Queue synchronized between client and consumer secret - String of the project’s secret options - Hash of consumer options

batch_size - Fixnum of how many items to send in a batch
on_error   - Proc of what to do on an error


21
22
23
24
25
26
27
28
29
30
31
32
# File 'lib/analytics-ruby/consumer.rb', line 21

def initialize(queue, secret, options = {})
  Util.symbolize_keys! options

  @queue = queue
  @secret = secret
  @batch_size = options[:batch_size] || Defaults::Queue::BATCH_SIZE
  @on_error = options[:on_error] || Proc.new { |status, error| }

  @current_batch = []

  @mutex = Mutex.new
end

Instance Method Details

#flushObject

public: Flush some events from our queue



44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
# File 'lib/analytics-ruby/consumer.rb', line 44

def flush
  # Block until we have something to send
  item = @queue.pop
  return if item.nil?

  # Synchronize on additions to the current batch
  @mutex.synchronize {
    @current_batch << item
    until @current_batch.length >= @batch_size || @queue.empty?
      @current_batch << @queue.pop
    end
  }

  req = Request.new
  res = req.post @secret, @current_batch
  @on_error.call res.status, res.error unless res.status == 200
  @mutex.synchronize {
    @current_batch = []
  }
end

#is_requesting?Boolean

public: Check whether we have outstanding requests.

Returns:

  • (Boolean)


67
68
69
70
71
72
73
# File 'lib/analytics-ruby/consumer.rb', line 67

def is_requesting?
  requesting = nil
  @mutex.synchronize {
    requesting = !@current_batch.empty?
  }
  requesting
end

#runObject

public: Continuously runs the loop to check for new events



36
37
38
39
40
# File 'lib/analytics-ruby/consumer.rb', line 36

def run
  until Thread.current[:should_exit]
    flush
  end
end