Class: Segment::Analytics::Worker
- Inherits:
- 
      Object
      
        - Object
- Segment::Analytics::Worker
 
- Defined in:
- lib/segment/analytics/worker.rb
Constant Summary
Constants included from Utils
Utils::UTC_OFFSET_WITHOUT_COLON, Utils::UTC_OFFSET_WITH_COLON
Instance Method Summary collapse
- 
  
    
      #initialize(queue, write_key, options = {})  ⇒ Worker 
    
    
  
  
  
    constructor
  
  
  
  
  
  
  
    public: Creates a new worker. 
- 
  
    
      #is_requesting?  ⇒ Boolean 
    
    
  
  
  
  
  
  
  
  
  
    public: Check whether we have outstanding requests. 
- 
  
    
      #run  ⇒ Object 
    
    
  
  
  
  
  
  
  
  
  
    public: Continuously runs the loop to check for new events. 
Methods included from Utils
#date_in_iso8601, #datetime_in_iso8601, #formatted_offset, #isoify_dates, #isoify_dates!, #seconds_to_utc_offset, #stringify_keys, #symbolize_keys, #symbolize_keys!, #time_in_iso8601, #uid
Constructor Details
#initialize(queue, write_key, options = {}) ⇒ Worker
public: Creates a new worker
The worker continuously takes messages off the queue and makes requests to the segment.io api
queue - Queue synchronized between client and worker write_key - String of the project’s Write key options - Hash of worker options
batch_size - Fixnum of how many items to send in a batch
on_error   - Proc of what to do on an error
| 24 25 26 27 28 29 30 31 32 33 | # File 'lib/segment/analytics/worker.rb', line 24 def initialize(queue, write_key, = {}) symbolize_keys! @queue = queue @write_key = write_key @on_error = [:on_error] || proc { |status, error| } batch_size = [:batch_size] || Defaults::MessageBatch::MAX_SIZE @batch = MessageBatch.new(batch_size) @lock = Mutex.new @request = Request.new end | 
Instance Method Details
#is_requesting? ⇒ Boolean
public: Check whether we have outstanding requests.
| 56 57 58 | # File 'lib/segment/analytics/worker.rb', line 56 def is_requesting? @lock.synchronize { !@batch.empty? } end | 
#run ⇒ Object
public: Continuously runs the loop to check for new events
| 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 | # File 'lib/segment/analytics/worker.rb', line 37 def run until Thread.current[:should_exit] return if @queue.empty? @lock.synchronize do until @batch.full? || @queue.empty? @batch << Message.new(@queue.pop) end end res = @request.post(@write_key, @batch) @on_error.call(res.status, res.error) unless res.status == 200 @lock.synchronize { @batch.clear } end end |