Class: Hackle::DefaultUserEventProcessor

Inherits:
Object
  • Object
show all
Includes:
UserEventProcessor
Defined in:
lib/hackle/internal/event/user_event_processor.rb

Overview

noinspection RubyTooManyInstanceVariablesInspection

Defined Under Namespace

Classes: Message

Instance Method Summary collapse

Constructor Details

#initialize(queue:, event_dispatcher:, event_dispatch_size:, flush_scheduler:, flush_interval_seconds:, shutdown_timeout_seconds:) ⇒ DefaultUserEventProcessor

Returns a new instance of DefaultUserEventProcessor.

Parameters:

  • queue (SizedQueue)
  • event_dispatcher (UserEventDispatcher)
  • event_dispatch_size (Integer)
  • flush_scheduler (Scheduler)
  • flush_interval_seconds (Float)
  • shutdown_timeout_seconds (Float)


29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
# File 'lib/hackle/internal/event/user_event_processor.rb', line 29

def initialize(
  queue:,
  event_dispatcher:,
  event_dispatch_size:,
  flush_scheduler:,
  flush_interval_seconds:,
  shutdown_timeout_seconds:
)
  # @type [SizedQueue]
  @queue = queue

  # @type [UserEventDispatcher]
  @event_dispatcher = event_dispatcher

  # @type [Integer]
  @event_dispatch_size = event_dispatch_size

  # @type [Scheduler]
  @flush_scheduler = flush_scheduler

  # @type [Float]
  @flush_interval_seconds = flush_interval_seconds

  # @type [Float]
  @shutdown_timeout_seconds = shutdown_timeout_seconds

  # @type [ScheduledJob, nil]
  @flushing_job = nil

  # @type [Thread, nil]
  @consuming_task = nil

  # @type [boolean]
  @is_started = false

  # @type [Array<UserEvent>]
  @current_batch = []
end

Instance Method Details

#process(event) ⇒ Object

Parameters:



69
70
71
# File 'lib/hackle/internal/event/user_event_processor.rb', line 69

def process(event)
  produce(message: Message::Event.new(event))
end

#resumeObject



100
101
102
103
104
105
106
107
# File 'lib/hackle/internal/event/user_event_processor.rb', line 100

def resume
  @consuming_task = Thread.new { consuming } if @consuming_task.nil? || !@consuming_task.alive?

  @flushing_job&.cancel
  @flushing_job = @flush_scheduler.schedule_periodically(@flush_interval_seconds, -> { flush })

  @is_started = true
end

#startObject



73
74
75
76
77
78
79
80
81
82
83
# File 'lib/hackle/internal/event/user_event_processor.rb', line 73

def start
  if @is_started
    Log.get.info { "#{UserEventProcessor} is already started." }
    return
  end

  @consuming_task = Thread.new { consuming }
  @flushing_job = @flush_scheduler.schedule_periodically(@flush_interval_seconds, -> { flush })
  @is_started = true
  Log.get.info { "#{UserEventProcessor} started. Flush event every #{@flush_interval_seconds} seconds." }
end

#stopObject



85
86
87
88
89
90
91
92
93
94
95
96
97
98
# File 'lib/hackle/internal/event/user_event_processor.rb', line 85

def stop
  return unless @is_started

  Log.get.info { "Shutting down #{UserEventProcessor}" }

  @flushing_job&.cancel

  produce(message: Message::Shutdown.new, non_block: false)
  @consuming_task&.join(@shutdown_timeout_seconds)

  @event_dispatcher.shutdown

  @is_started = false
end