Class: Optimizely::BatchEventProcessor
- Inherits:
-
EventProcessor
- Object
- EventProcessor
- Optimizely::BatchEventProcessor
- Defined in:
- lib/optimizely/event/batch_event_processor.rb
Constant Summary collapse
- DEFAULT_BATCH_SIZE =
10- DEFAULT_BATCH_INTERVAL =
interval in milliseconds
30_000- DEFAULT_QUEUE_CAPACITY =
1000- DEFAULT_TIMEOUT_INTERVAL =
interval in seconds
5- MAX_NIL_COUNT =
3- FLUSH_SIGNAL =
'FLUSH_SIGNAL'- SHUTDOWN_SIGNAL =
'SHUTDOWN_SIGNAL'
Instance Attribute Summary collapse
-
#batch_size ⇒ Object
readonly
BatchEventProcessor is a batched implementation of the Interface EventProcessor.
-
#current_batch ⇒ Object
readonly
BatchEventProcessor is a batched implementation of the Interface EventProcessor.
-
#event_dispatcher ⇒ Object
readonly
BatchEventProcessor is a batched implementation of the Interface EventProcessor.
-
#event_queue ⇒ Object
readonly
BatchEventProcessor is a batched implementation of the Interface EventProcessor.
-
#flush_interval ⇒ Object
readonly
BatchEventProcessor is a batched implementation of the Interface EventProcessor.
-
#started ⇒ Object
readonly
BatchEventProcessor is a batched implementation of the Interface EventProcessor.
Instance Method Summary collapse
- #flush ⇒ Object
-
#initialize(event_queue: SizedQueue.new(DEFAULT_QUEUE_CAPACITY), event_dispatcher: nil, batch_size: DEFAULT_BATCH_SIZE, flush_interval: DEFAULT_BATCH_INTERVAL, logger: NoOpLogger.new, notification_center: nil) ⇒ BatchEventProcessor
constructor
A new instance of BatchEventProcessor.
- #process(user_event) ⇒ Object
- #start! ⇒ Object
- #stop! ⇒ Object
Constructor Details
#initialize(event_queue: SizedQueue.new(DEFAULT_QUEUE_CAPACITY), event_dispatcher: nil, batch_size: DEFAULT_BATCH_SIZE, flush_interval: DEFAULT_BATCH_INTERVAL, logger: NoOpLogger.new, notification_center: nil) ⇒ BatchEventProcessor
Returns a new instance of BatchEventProcessor.
39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 |
# File 'lib/optimizely/event/batch_event_processor.rb', line 39 def initialize( event_queue: SizedQueue.new(DEFAULT_QUEUE_CAPACITY), event_dispatcher: nil, batch_size: DEFAULT_BATCH_SIZE, flush_interval: DEFAULT_BATCH_INTERVAL, logger: NoOpLogger.new, notification_center: nil ) @event_queue = event_queue @logger = logger @event_dispatcher = event_dispatcher || EventDispatcher.new(logger: @logger) @batch_size = if (batch_size.is_a? Integer) && positive_number?(batch_size) batch_size else @logger.log(Logger::DEBUG, "Setting to default batch_size: #{DEFAULT_BATCH_SIZE}.") DEFAULT_BATCH_SIZE end @flush_interval = if positive_number?(flush_interval) flush_interval else @logger.log(Logger::DEBUG, "Setting to default flush_interval: #{DEFAULT_BATCH_INTERVAL} ms.") DEFAULT_BATCH_INTERVAL end @notification_center = notification_center @current_batch = [] @started = false start! end |
Instance Attribute Details
#batch_size ⇒ Object (readonly)
BatchEventProcessor is a batched implementation of the Interface EventProcessor. Events passed to the BatchEventProcessor are immediately added to an EventQueue. The BatchEventProcessor maintains a single consumer thread that pulls events off of the BlockingQueue and buffers them for either a configured batch size or for a maximum duration before the resulting LogEvent is sent to the NotificationCenter.
28 29 30 |
# File 'lib/optimizely/event/batch_event_processor.rb', line 28 def batch_size @batch_size end |
#current_batch ⇒ Object (readonly)
BatchEventProcessor is a batched implementation of the Interface EventProcessor. Events passed to the BatchEventProcessor are immediately added to an EventQueue. The BatchEventProcessor maintains a single consumer thread that pulls events off of the BlockingQueue and buffers them for either a configured batch size or for a maximum duration before the resulting LogEvent is sent to the NotificationCenter.
28 29 30 |
# File 'lib/optimizely/event/batch_event_processor.rb', line 28 def current_batch @current_batch end |
#event_dispatcher ⇒ Object (readonly)
BatchEventProcessor is a batched implementation of the Interface EventProcessor. Events passed to the BatchEventProcessor are immediately added to an EventQueue. The BatchEventProcessor maintains a single consumer thread that pulls events off of the BlockingQueue and buffers them for either a configured batch size or for a maximum duration before the resulting LogEvent is sent to the NotificationCenter.
28 29 30 |
# File 'lib/optimizely/event/batch_event_processor.rb', line 28 def event_dispatcher @event_dispatcher end |
#event_queue ⇒ Object (readonly)
BatchEventProcessor is a batched implementation of the Interface EventProcessor. Events passed to the BatchEventProcessor are immediately added to an EventQueue. The BatchEventProcessor maintains a single consumer thread that pulls events off of the BlockingQueue and buffers them for either a configured batch size or for a maximum duration before the resulting LogEvent is sent to the NotificationCenter.
28 29 30 |
# File 'lib/optimizely/event/batch_event_processor.rb', line 28 def event_queue @event_queue end |
#flush_interval ⇒ Object (readonly)
BatchEventProcessor is a batched implementation of the Interface EventProcessor. Events passed to the BatchEventProcessor are immediately added to an EventQueue. The BatchEventProcessor maintains a single consumer thread that pulls events off of the BlockingQueue and buffers them for either a configured batch size or for a maximum duration before the resulting LogEvent is sent to the NotificationCenter.
28 29 30 |
# File 'lib/optimizely/event/batch_event_processor.rb', line 28 def flush_interval @flush_interval end |
#started ⇒ Object (readonly)
BatchEventProcessor is a batched implementation of the Interface EventProcessor. Events passed to the BatchEventProcessor are immediately added to an EventQueue. The BatchEventProcessor maintains a single consumer thread that pulls events off of the BlockingQueue and buffers them for either a configured batch size or for a maximum duration before the resulting LogEvent is sent to the NotificationCenter.
28 29 30 |
# File 'lib/optimizely/event/batch_event_processor.rb', line 28 def started @started end |
Instance Method Details
#flush ⇒ Object
79 80 81 |
# File 'lib/optimizely/event/batch_event_processor.rb', line 79 def flush @event_queue << FLUSH_SIGNAL end |
#process(user_event) ⇒ Object
83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 |
# File 'lib/optimizely/event/batch_event_processor.rb', line 83 def process(user_event) @logger.log(Logger::DEBUG, "Received userEvent: #{user_event}") if !@started || !@thread.alive? @logger.log(Logger::WARN, 'Executor shutdown, not accepting tasks.') return end begin @event_queue.push(user_event, true) rescue Exception @logger.log(Logger::WARN, 'Payload not accepted by the queue.') return end end |
#start! ⇒ Object
68 69 70 71 72 73 74 75 76 77 |
# File 'lib/optimizely/event/batch_event_processor.rb', line 68 def start! if @started == true @logger.log(Logger::WARN, 'Service already started.') return end @flushing_interval_deadline = Helpers::DateTimeUtils. + @flush_interval @logger.log(Logger::INFO, 'Starting scheduler.') @thread = Thread.new { run } @started = true end |
#stop! ⇒ Object
99 100 101 102 103 104 105 106 |
# File 'lib/optimizely/event/batch_event_processor.rb', line 99 def stop! return unless @started @logger.log(Logger::INFO, 'Stopping scheduler.') @event_queue << SHUTDOWN_SIGNAL @thread.join(DEFAULT_TIMEOUT_INTERVAL) @started = false end |