Class: Hackle::DefaultUserEventProcessor
- Inherits:
-
Object
- Object
- Hackle::DefaultUserEventProcessor
- Includes:
- UserEventProcessor
- Defined in:
- lib/hackle/internal/event/user_event_processor.rb
Overview
noinspection RubyTooManyInstanceVariablesInspection
Defined Under Namespace
Classes: Message
Instance Method Summary collapse
-
#initialize(queue:, event_dispatcher:, event_dispatch_size:, flush_scheduler:, flush_interval_seconds:, shutdown_timeout_seconds:) ⇒ DefaultUserEventProcessor
constructor
A new instance of DefaultUserEventProcessor.
- #process(event) ⇒ Object
- #resume ⇒ Object
- #start ⇒ Object
- #stop ⇒ Object
Constructor Details
#initialize(queue:, event_dispatcher:, event_dispatch_size:, flush_scheduler:, flush_interval_seconds:, shutdown_timeout_seconds:) ⇒ DefaultUserEventProcessor
Returns a new instance of DefaultUserEventProcessor.
29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 |
# File 'lib/hackle/internal/event/user_event_processor.rb', line 29 def initialize( queue:, event_dispatcher:, event_dispatch_size:, flush_scheduler:, flush_interval_seconds:, shutdown_timeout_seconds: ) # @type [SizedQueue] @queue = queue # @type [UserEventDispatcher] @event_dispatcher = event_dispatcher # @type [Integer] @event_dispatch_size = event_dispatch_size # @type [Scheduler] @flush_scheduler = flush_scheduler # @type [Float] @flush_interval_seconds = flush_interval_seconds # @type [Float] @shutdown_timeout_seconds = shutdown_timeout_seconds # @type [ScheduledJob, nil] @flushing_job = nil # @type [Thread, nil] @consuming_task = nil # @type [boolean] @is_started = false # @type [Array<UserEvent>] @current_batch = [] end |
Instance Method Details
#process(event) ⇒ Object
69 70 71 |
# File 'lib/hackle/internal/event/user_event_processor.rb', line 69 def process(event) produce(message: Message::Event.new(event)) end |
#resume ⇒ Object
100 101 102 103 104 105 106 107 |
# File 'lib/hackle/internal/event/user_event_processor.rb', line 100 def resume @consuming_task = Thread.new { consuming } if @consuming_task.nil? || !@consuming_task.alive? @flushing_job&.cancel @flushing_job = @flush_scheduler.schedule_periodically(@flush_interval_seconds, -> { flush }) @is_started = true end |
#start ⇒ Object
73 74 75 76 77 78 79 80 81 82 83 |
# File 'lib/hackle/internal/event/user_event_processor.rb', line 73 def start if @is_started Log.get.info { "#{UserEventProcessor} is already started." } return end @consuming_task = Thread.new { consuming } @flushing_job = @flush_scheduler.schedule_periodically(@flush_interval_seconds, -> { flush }) @is_started = true Log.get.info { "#{UserEventProcessor} started. Flush event every #{@flush_interval_seconds} seconds." } end |
#stop ⇒ Object
85 86 87 88 89 90 91 92 93 94 95 96 97 98 |
# File 'lib/hackle/internal/event/user_event_processor.rb', line 85 def stop return unless @is_started Log.get.info { "Shutting down #{UserEventProcessor}" } @flushing_job&.cancel produce(message: Message::Shutdown.new, non_block: false) @consuming_task&.join(@shutdown_timeout_seconds) @event_dispatcher.shutdown @is_started = false end |