Class: Optimizely::BatchEventProcessor
- Inherits:
-
EventProcessor
- Object
- EventProcessor
- Optimizely::BatchEventProcessor
- Defined in:
- lib/optimizely/event/batch_event_processor.rb
Constant Summary collapse
- DEFAULT_BATCH_SIZE =
10
- DEFAULT_BATCH_INTERVAL =
interval in milliseconds
30_000
- DEFAULT_QUEUE_CAPACITY =
1000
- FLUSH_SIGNAL =
'FLUSH_SIGNAL'
- SHUTDOWN_SIGNAL =
'SHUTDOWN_SIGNAL'
Instance Attribute Summary collapse
-
#batch_size ⇒ Object
readonly
BatchEventProcessor is a batched implementation of the Interface EventProcessor.
-
#current_batch ⇒ Object
readonly
BatchEventProcessor is a batched implementation of the Interface EventProcessor.
-
#event_dispatcher ⇒ Object
readonly
BatchEventProcessor is a batched implementation of the Interface EventProcessor.
-
#event_queue ⇒ Object
readonly
BatchEventProcessor is a batched implementation of the Interface EventProcessor.
-
#flush_interval ⇒ Object
readonly
BatchEventProcessor is a batched implementation of the Interface EventProcessor.
-
#started ⇒ Object
readonly
BatchEventProcessor is a batched implementation of the Interface EventProcessor.
Instance Method Summary collapse
- #flush ⇒ Object
-
#initialize(event_queue: SizedQueue.new(DEFAULT_QUEUE_CAPACITY), event_dispatcher: Optimizely::EventDispatcher.new, batch_size: DEFAULT_BATCH_SIZE, flush_interval: DEFAULT_BATCH_INTERVAL, logger: NoOpLogger.new, notification_center: nil) ⇒ BatchEventProcessor
constructor
A new instance of BatchEventProcessor.
- #process(user_event) ⇒ Object
- #start! ⇒ Object
- #stop! ⇒ Object
Constructor Details
#initialize(event_queue: SizedQueue.new(DEFAULT_QUEUE_CAPACITY), event_dispatcher: Optimizely::EventDispatcher.new, batch_size: DEFAULT_BATCH_SIZE, flush_interval: DEFAULT_BATCH_INTERVAL, logger: NoOpLogger.new, notification_center: nil) ⇒ BatchEventProcessor
Returns a new instance of BatchEventProcessor.
37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 |
# File 'lib/optimizely/event/batch_event_processor.rb', line 37 def initialize( event_queue: SizedQueue.new(DEFAULT_QUEUE_CAPACITY), event_dispatcher: Optimizely::EventDispatcher.new, batch_size: DEFAULT_BATCH_SIZE, flush_interval: DEFAULT_BATCH_INTERVAL, logger: NoOpLogger.new, notification_center: nil ) @event_queue = event_queue @logger = logger @event_dispatcher = event_dispatcher @batch_size = if (batch_size.is_a? Integer) && positive_number?(batch_size) batch_size else @logger.log(Logger::DEBUG, "Setting to default batch_size: #{DEFAULT_BATCH_SIZE}.") DEFAULT_BATCH_SIZE end @flush_interval = if positive_number?(flush_interval) flush_interval else @logger.log(Logger::DEBUG, "Setting to default flush_interval: #{DEFAULT_BATCH_INTERVAL} ms.") DEFAULT_BATCH_INTERVAL end @notification_center = notification_center @mutex = Mutex.new @received = ConditionVariable.new @current_batch = [] @started = false start! end |
Instance Attribute Details
#batch_size ⇒ Object (readonly)
BatchEventProcessor is a batched implementation of the Interface EventProcessor. Events passed to the BatchEventProcessor are immediately added to a EventQueue. The BatchEventProcessor maintains a single consumer thread that pulls events off of the BlockingQueue and buffers them for either a configured batch size or for a maximum duration before the resulting LogEvent is sent to the NotificationCenter.
28 29 30 |
# File 'lib/optimizely/event/batch_event_processor.rb', line 28 def batch_size @batch_size end |
#current_batch ⇒ Object (readonly)
BatchEventProcessor is a batched implementation of the Interface EventProcessor. Events passed to the BatchEventProcessor are immediately added to a EventQueue. The BatchEventProcessor maintains a single consumer thread that pulls events off of the BlockingQueue and buffers them for either a configured batch size or for a maximum duration before the resulting LogEvent is sent to the NotificationCenter.
28 29 30 |
# File 'lib/optimizely/event/batch_event_processor.rb', line 28 def current_batch @current_batch end |
#event_dispatcher ⇒ Object (readonly)
BatchEventProcessor is a batched implementation of the Interface EventProcessor. Events passed to the BatchEventProcessor are immediately added to a EventQueue. The BatchEventProcessor maintains a single consumer thread that pulls events off of the BlockingQueue and buffers them for either a configured batch size or for a maximum duration before the resulting LogEvent is sent to the NotificationCenter.
28 29 30 |
# File 'lib/optimizely/event/batch_event_processor.rb', line 28 def event_dispatcher @event_dispatcher end |
#event_queue ⇒ Object (readonly)
BatchEventProcessor is a batched implementation of the Interface EventProcessor. Events passed to the BatchEventProcessor are immediately added to a EventQueue. The BatchEventProcessor maintains a single consumer thread that pulls events off of the BlockingQueue and buffers them for either a configured batch size or for a maximum duration before the resulting LogEvent is sent to the NotificationCenter.
28 29 30 |
# File 'lib/optimizely/event/batch_event_processor.rb', line 28 def event_queue @event_queue end |
#flush_interval ⇒ Object (readonly)
BatchEventProcessor is a batched implementation of the Interface EventProcessor. Events passed to the BatchEventProcessor are immediately added to a EventQueue. The BatchEventProcessor maintains a single consumer thread that pulls events off of the BlockingQueue and buffers them for either a configured batch size or for a maximum duration before the resulting LogEvent is sent to the NotificationCenter.
28 29 30 |
# File 'lib/optimizely/event/batch_event_processor.rb', line 28 def flush_interval @flush_interval end |
#started ⇒ Object (readonly)
BatchEventProcessor is a batched implementation of the Interface EventProcessor. Events passed to the BatchEventProcessor are immediately added to a EventQueue. The BatchEventProcessor maintains a single consumer thread that pulls events off of the BlockingQueue and buffers them for either a configured batch size or for a maximum duration before the resulting LogEvent is sent to the NotificationCenter.
28 29 30 |
# File 'lib/optimizely/event/batch_event_processor.rb', line 28 def started @started end |
Instance Method Details
#flush ⇒ Object
78 79 80 81 82 83 |
# File 'lib/optimizely/event/batch_event_processor.rb', line 78 def flush @mutex.synchronize do @event_queue << FLUSH_SIGNAL @received.signal end end |
#process(user_event) ⇒ Object
85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 |
# File 'lib/optimizely/event/batch_event_processor.rb', line 85 def process(user_event) @logger.log(Logger::DEBUG, "Received userEvent: #{user_event}") if !@started || !@thread.alive? @logger.log(Logger::WARN, 'Executor shutdown, not accepting tasks.') return end @mutex.synchronize do begin @event_queue << user_event @received.signal rescue Exception @logger.log(Logger::WARN, 'Payload not accepted by the queue.') return end end end |
#start! ⇒ Object
68 69 70 71 72 73 74 75 76 |
# File 'lib/optimizely/event/batch_event_processor.rb', line 68 def start! if @started == true @logger.log(Logger::WARN, 'Service already started.') return end @flushing_interval_deadline = Helpers::DateTimeUtils. + @flush_interval @thread = Thread.new { run } @started = true end |
#stop! ⇒ Object
104 105 106 107 108 109 110 111 112 113 114 115 |
# File 'lib/optimizely/event/batch_event_processor.rb', line 104 def stop! return unless @started @mutex.synchronize do @event_queue << SHUTDOWN_SIGNAL @received.signal end @started = false @logger.log(Logger::WARN, 'Stopping scheduler.') @thread.exit end |