Class: AnalyticsRuby::Consumer
- Inherits:
-
Object
- Object
- AnalyticsRuby::Consumer
- Defined in:
- lib/analytics-ruby/consumer.rb
Instance Method Summary collapse
-
#flush ⇒ Object
public: Flush some events from our queue.
-
#initialize(queue, secret, options = {}) ⇒ Consumer
constructor
public: Creates a new consumer.
-
#is_requesting? ⇒ Boolean
public: Check whether we have outstanding requests.
-
#run ⇒ Object
public: Continuously runs the loop to check for new events.
Constructor Details
#initialize(queue, secret, options = {}) ⇒ Consumer
public: Creates a new consumer
The consumer continuously takes messages off the queue and makes requests to the segment.io api
queue - Queue synchronized between client and consumer secret - String of the project’s secret options - Hash of consumer options
batch_size - Fixnum of how many items to send in a batch
on_error - Proc of what to do on an error
21 22 23 24 25 26 27 28 29 30 31 32 |
# File 'lib/analytics-ruby/consumer.rb', line 21 def initialize(queue, secret, = {}) Util.symbolize_keys! @queue = queue @secret = secret @batch_size = [:batch_size] || Defaults::Queue::BATCH_SIZE @on_error = [:on_error] || Proc.new { |status, error| } @current_batch = [] @mutex = Mutex.new end |
Instance Method Details
#flush ⇒ Object
public: Flush some events from our queue
44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 |
# File 'lib/analytics-ruby/consumer.rb', line 44 def flush # Block until we have something to send item = @queue.pop return if item.nil? # Synchronize on additions to the current batch @mutex.synchronize { @current_batch << item until @current_batch.length >= @batch_size || @queue.empty? @current_batch << @queue.pop end } req = Request.new res = req.post @secret, @current_batch @on_error.call res.status, res.error unless res.status == 200 @mutex.synchronize { @current_batch = [] } end |
#is_requesting? ⇒ Boolean
public: Check whether we have outstanding requests.
67 68 69 70 71 72 73 |
# File 'lib/analytics-ruby/consumer.rb', line 67 def is_requesting? requesting = nil @mutex.synchronize { requesting = !@current_batch.empty? } requesting end |
#run ⇒ Object
public: Continuously runs the loop to check for new events
36 37 38 39 40 |
# File 'lib/analytics-ruby/consumer.rb', line 36 def run until Thread.current[:should_exit] flush end end |