Class: Optimizely::BatchEventProcessor
- Inherits:
-
EventProcessor
- Object
- EventProcessor
- Optimizely::BatchEventProcessor
- Defined in:
- lib/optimizely/event/batch_event_processor.rb
Constant Summary collapse
- DEFAULT_BATCH_SIZE =
10- DEFAULT_BATCH_INTERVAL =
interval in milliseconds
30_000- DEFAULT_QUEUE_CAPACITY =
1000- DEFAULT_TIMEOUT_INTERVAL =
interval in seconds
5- FLUSH_SIGNAL =
'FLUSH_SIGNAL'- SHUTDOWN_SIGNAL =
'SHUTDOWN_SIGNAL'
Instance Attribute Summary collapse
-
#batch_size ⇒ Object
readonly
BatchEventProcessor is a batched implementation of the Interface EventProcessor.
-
#current_batch ⇒ Object
readonly
BatchEventProcessor is a batched implementation of the Interface EventProcessor.
-
#event_dispatcher ⇒ Object
readonly
BatchEventProcessor is a batched implementation of the Interface EventProcessor.
-
#event_queue ⇒ Object
readonly
BatchEventProcessor is a batched implementation of the Interface EventProcessor.
-
#flush_interval ⇒ Object
readonly
BatchEventProcessor is a batched implementation of the Interface EventProcessor.
-
#started ⇒ Object
readonly
BatchEventProcessor is a batched implementation of the Interface EventProcessor.
Instance Method Summary collapse
- #flush ⇒ Object
-
#initialize(event_queue: SizedQueue.new(DEFAULT_QUEUE_CAPACITY), event_dispatcher: nil, batch_size: DEFAULT_BATCH_SIZE, flush_interval: DEFAULT_BATCH_INTERVAL, logger: NoOpLogger.new, notification_center: nil) ⇒ BatchEventProcessor
constructor
A new instance of BatchEventProcessor.
- #process(user_event) ⇒ Object
- #start! ⇒ Object
- #stop! ⇒ Object
Constructor Details
#initialize(event_queue: SizedQueue.new(DEFAULT_QUEUE_CAPACITY), event_dispatcher: nil, batch_size: DEFAULT_BATCH_SIZE, flush_interval: DEFAULT_BATCH_INTERVAL, logger: NoOpLogger.new, notification_center: nil) ⇒ BatchEventProcessor
38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 |
# File 'lib/optimizely/event/batch_event_processor.rb', line 38 def initialize( event_queue: SizedQueue.new(DEFAULT_QUEUE_CAPACITY), event_dispatcher: nil, batch_size: DEFAULT_BATCH_SIZE, flush_interval: DEFAULT_BATCH_INTERVAL, logger: NoOpLogger.new, notification_center: nil ) @event_queue = event_queue @logger = logger @event_dispatcher = event_dispatcher || EventDispatcher.new(logger: @logger) @batch_size = if (batch_size.is_a? Integer) && positive_number?(batch_size) batch_size else @logger.log(Logger::DEBUG, "Setting to default batch_size: #{DEFAULT_BATCH_SIZE}.") DEFAULT_BATCH_SIZE end @flush_interval = if positive_number?(flush_interval) flush_interval else @logger.log(Logger::DEBUG, "Setting to default flush_interval: #{DEFAULT_BATCH_INTERVAL} ms.") DEFAULT_BATCH_INTERVAL end @notification_center = notification_center @current_batch = [] @started = false @stopped = false end |
Instance Attribute Details
#batch_size ⇒ Object (readonly)
BatchEventProcessor is a batched implementation of the Interface EventProcessor. Events passed to the BatchEventProcessor are immediately added to an EventQueue. The BatchEventProcessor maintains a single consumer thread that pulls events off of the BlockingQueue and buffers them for either a configured batch size or for a maximum duration before the resulting LogEvent is sent to the NotificationCenter.
28 29 30 |
# File 'lib/optimizely/event/batch_event_processor.rb', line 28 def batch_size @batch_size end |
#current_batch ⇒ Object (readonly)
BatchEventProcessor is a batched implementation of the Interface EventProcessor. Events passed to the BatchEventProcessor are immediately added to an EventQueue. The BatchEventProcessor maintains a single consumer thread that pulls events off of the BlockingQueue and buffers them for either a configured batch size or for a maximum duration before the resulting LogEvent is sent to the NotificationCenter.
28 29 30 |
# File 'lib/optimizely/event/batch_event_processor.rb', line 28 def current_batch @current_batch end |
#event_dispatcher ⇒ Object (readonly)
BatchEventProcessor is a batched implementation of the Interface EventProcessor. Events passed to the BatchEventProcessor are immediately added to an EventQueue. The BatchEventProcessor maintains a single consumer thread that pulls events off of the BlockingQueue and buffers them for either a configured batch size or for a maximum duration before the resulting LogEvent is sent to the NotificationCenter.
28 29 30 |
# File 'lib/optimizely/event/batch_event_processor.rb', line 28 def event_dispatcher @event_dispatcher end |
#event_queue ⇒ Object (readonly)
BatchEventProcessor is a batched implementation of the Interface EventProcessor. Events passed to the BatchEventProcessor are immediately added to an EventQueue. The BatchEventProcessor maintains a single consumer thread that pulls events off of the BlockingQueue and buffers them for either a configured batch size or for a maximum duration before the resulting LogEvent is sent to the NotificationCenter.
28 29 30 |
# File 'lib/optimizely/event/batch_event_processor.rb', line 28 def event_queue @event_queue end |
#flush_interval ⇒ Object (readonly)
BatchEventProcessor is a batched implementation of the Interface EventProcessor. Events passed to the BatchEventProcessor are immediately added to an EventQueue. The BatchEventProcessor maintains a single consumer thread that pulls events off of the BlockingQueue and buffers them for either a configured batch size or for a maximum duration before the resulting LogEvent is sent to the NotificationCenter.
28 29 30 |
# File 'lib/optimizely/event/batch_event_processor.rb', line 28 def flush_interval @flush_interval end |
#started ⇒ Object (readonly)
BatchEventProcessor is a batched implementation of the Interface EventProcessor. Events passed to the BatchEventProcessor are immediately added to an EventQueue. The BatchEventProcessor maintains a single consumer thread that pulls events off of the BlockingQueue and buffers them for either a configured batch size or for a maximum duration before the resulting LogEvent is sent to the NotificationCenter.
28 29 30 |
# File 'lib/optimizely/event/batch_event_processor.rb', line 28 def started @started end |
Instance Method Details
#flush ⇒ Object
83 84 85 86 |
# File 'lib/optimizely/event/batch_event_processor.rb', line 83 def flush @event_queue << FLUSH_SIGNAL @wait_mutex.synchronize { @resource.signal } end |
#process(user_event) ⇒ Object
88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 |
# File 'lib/optimizely/event/batch_event_processor.rb', line 88 def process(user_event) @logger.log(Logger::DEBUG, "Received userEvent: #{user_event}") # if the processor has been explicitly stopped. Don't accept tasks if @stopped @logger.log(Logger::WARN, 'Executor shutdown, not accepting tasks.') return end # start if the processor hasn't been started start! unless @started begin @event_queue.push(user_event, true) @wait_mutex.synchronize { @resource.signal } rescue => e @logger.log(Logger::WARN, 'Payload not accepted by the queue: ' + e.) return end end |
#start! ⇒ Object
67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 |
# File 'lib/optimizely/event/batch_event_processor.rb', line 67 def start! if @started == true @logger.log(Logger::WARN, 'Service already started.') return end @flushing_interval_deadline = Helpers::DateTimeUtils. + @flush_interval @logger.log(Logger::INFO, 'Starting scheduler.') if @wait_mutex.nil? @wait_mutex = Mutex.new @resource = ConditionVariable.new end @thread = Thread.new { run_queue } @started = true @stopped = false end |
#stop! ⇒ Object
109 110 111 112 113 114 115 116 117 118 |
# File 'lib/optimizely/event/batch_event_processor.rb', line 109 def stop! return unless @started @logger.log(Logger::INFO, 'Stopping scheduler.') @event_queue << SHUTDOWN_SIGNAL @wait_mutex.synchronize { @resource.signal } @thread.join(DEFAULT_TIMEOUT_INTERVAL) @started = false @stopped = true end |