Class: Optimizely::BatchEventProcessor

Inherits:
EventProcessor show all
Defined in:
lib/optimizely/event/batch_event_processor.rb

Constant Summary collapse

DEFAULT_BATCH_SIZE =
10
DEFAULT_BATCH_INTERVAL =

interval in milliseconds

30_000
DEFAULT_QUEUE_CAPACITY =
1000
FLUSH_SIGNAL =
'FLUSH_SIGNAL'
SHUTDOWN_SIGNAL =
'SHUTDOWN_SIGNAL'

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(event_queue: SizedQueue.new(DEFAULT_QUEUE_CAPACITY), event_dispatcher: Optimizely::EventDispatcher.new, batch_size: DEFAULT_BATCH_SIZE, flush_interval: DEFAULT_BATCH_INTERVAL, logger: NoOpLogger.new, notification_center: nil) ⇒ BatchEventProcessor

Returns a new instance of BatchEventProcessor.



37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
# File 'lib/optimizely/event/batch_event_processor.rb', line 37

def initialize(
  event_queue: SizedQueue.new(DEFAULT_QUEUE_CAPACITY),
  event_dispatcher: Optimizely::EventDispatcher.new,
  batch_size: DEFAULT_BATCH_SIZE,
  flush_interval: DEFAULT_BATCH_INTERVAL,
  logger: NoOpLogger.new,
  notification_center: nil
)
  @event_queue = event_queue
  @logger = logger
  @event_dispatcher = event_dispatcher
  @batch_size = if (batch_size.is_a? Integer) && positive_number?(batch_size)
                  batch_size
                else
                  @logger.log(Logger::DEBUG, "Setting to default batch_size: #{DEFAULT_BATCH_SIZE}.")
                  DEFAULT_BATCH_SIZE
                end
  @flush_interval = if positive_number?(flush_interval)
                      flush_interval
                    else
                      @logger.log(Logger::DEBUG, "Setting to default flush_interval: #{DEFAULT_BATCH_INTERVAL} ms.")
                      DEFAULT_BATCH_INTERVAL
                    end
  @notification_center = notification_center
  @mutex = Mutex.new
  @received = ConditionVariable.new
  @current_batch = []
  @started = false
  start!
end

Instance Attribute Details

#batch_sizeObject (readonly)

BatchEventProcessor is a batched implementation of the Interface EventProcessor. Events passed to the BatchEventProcessor are immediately added to a EventQueue. The BatchEventProcessor maintains a single consumer thread that pulls events off of the BlockingQueue and buffers them for either a configured batch size or for a maximum duration before the resulting LogEvent is sent to the NotificationCenter.



28
29
30
# File 'lib/optimizely/event/batch_event_processor.rb', line 28

def batch_size
  @batch_size
end

#current_batchObject (readonly)

BatchEventProcessor is a batched implementation of the Interface EventProcessor. Events passed to the BatchEventProcessor are immediately added to a EventQueue. The BatchEventProcessor maintains a single consumer thread that pulls events off of the BlockingQueue and buffers them for either a configured batch size or for a maximum duration before the resulting LogEvent is sent to the NotificationCenter.



28
29
30
# File 'lib/optimizely/event/batch_event_processor.rb', line 28

def current_batch
  @current_batch
end

#event_dispatcherObject (readonly)

BatchEventProcessor is a batched implementation of the Interface EventProcessor. Events passed to the BatchEventProcessor are immediately added to a EventQueue. The BatchEventProcessor maintains a single consumer thread that pulls events off of the BlockingQueue and buffers them for either a configured batch size or for a maximum duration before the resulting LogEvent is sent to the NotificationCenter.



28
29
30
# File 'lib/optimizely/event/batch_event_processor.rb', line 28

def event_dispatcher
  @event_dispatcher
end

#event_queueObject (readonly)

BatchEventProcessor is a batched implementation of the Interface EventProcessor. Events passed to the BatchEventProcessor are immediately added to a EventQueue. The BatchEventProcessor maintains a single consumer thread that pulls events off of the BlockingQueue and buffers them for either a configured batch size or for a maximum duration before the resulting LogEvent is sent to the NotificationCenter.



28
29
30
# File 'lib/optimizely/event/batch_event_processor.rb', line 28

def event_queue
  @event_queue
end

#flush_intervalObject (readonly)

BatchEventProcessor is a batched implementation of the Interface EventProcessor. Events passed to the BatchEventProcessor are immediately added to a EventQueue. The BatchEventProcessor maintains a single consumer thread that pulls events off of the BlockingQueue and buffers them for either a configured batch size or for a maximum duration before the resulting LogEvent is sent to the NotificationCenter.



28
29
30
# File 'lib/optimizely/event/batch_event_processor.rb', line 28

def flush_interval
  @flush_interval
end

#startedObject (readonly)

BatchEventProcessor is a batched implementation of the Interface EventProcessor. Events passed to the BatchEventProcessor are immediately added to a EventQueue. The BatchEventProcessor maintains a single consumer thread that pulls events off of the BlockingQueue and buffers them for either a configured batch size or for a maximum duration before the resulting LogEvent is sent to the NotificationCenter.



28
29
30
# File 'lib/optimizely/event/batch_event_processor.rb', line 28

def started
  @started
end

Instance Method Details

#flushObject



78
79
80
81
82
83
# File 'lib/optimizely/event/batch_event_processor.rb', line 78

def flush
  @mutex.synchronize do
    @event_queue << FLUSH_SIGNAL
    @received.signal
  end
end

#process(user_event) ⇒ Object



85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
# File 'lib/optimizely/event/batch_event_processor.rb', line 85

def process(user_event)
  @logger.log(Logger::DEBUG, "Received userEvent: #{user_event}")

  if !@started || !@thread.alive?
    @logger.log(Logger::WARN, 'Executor shutdown, not accepting tasks.')
    return
  end

  @mutex.synchronize do
    begin
      @event_queue << user_event
      @received.signal
    rescue Exception
      @logger.log(Logger::WARN, 'Payload not accepted by the queue.')
      return
    end
  end
end

#start!Object



68
69
70
71
72
73
74
75
76
# File 'lib/optimizely/event/batch_event_processor.rb', line 68

def start!
  if @started == true
    @logger.log(Logger::WARN, 'Service already started.')
    return
  end
  @flushing_interval_deadline = Helpers::DateTimeUtils.create_timestamp + @flush_interval
  @thread = Thread.new { run }
  @started = true
end

#stop!Object



104
105
106
107
108
109
110
111
112
113
114
115
# File 'lib/optimizely/event/batch_event_processor.rb', line 104

def stop!
  return unless @started

  @mutex.synchronize do
    @event_queue << SHUTDOWN_SIGNAL
    @received.signal
  end

  @started = false
  @logger.log(Logger::WARN, 'Stopping scheduler.')
  @thread.exit
end