Class: Core::Jobs::Consumer

Inherits:
Object
  • Object
show all
Defined in:
lib/core/jobs/consumer.rb

Instance Method Summary collapse

Constructor Details

#initialize(scheduler:, max_ingestion_queue_size:, max_ingestion_queue_bytes:, poll_interval: 3, termination_timeout: 60, min_threads: 1, max_threads: 5, max_queue: 100, idle_time: 5) ⇒ Consumer

Returns a new instance of Consumer.



18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# File 'lib/core/jobs/consumer.rb', line 18

def initialize(scheduler:,
               max_ingestion_queue_size:,
               max_ingestion_queue_bytes:,
               poll_interval: 3,
               termination_timeout: 60,
               min_threads: 1,
               max_threads: 5,
               max_queue: 100,
               idle_time: 5)
  @scheduler = scheduler
  @poll_interval = poll_interval
  @termination_timeout = termination_timeout
  @min_threads = min_threads
  @max_threads = max_threads
  @max_queue = max_queue
  @idle_time = idle_time

  @max_ingestion_queue_size = max_ingestion_queue_size
  @max_ingestion_queue_bytes = max_ingestion_queue_bytes
end

Instance Method Details

#running?Boolean

Returns:

  • (Boolean)


47
48
49
# File 'lib/core/jobs/consumer.rb', line 47

def running?
  pool&.running? && timer_task&.running?
end

#shutdown!Object



51
52
53
54
55
56
57
58
# File 'lib/core/jobs/consumer.rb', line 51

def shutdown!
  Utility::Logger.info("Shutting down consumer for #{@index_name} index")

  timer_task.shutdown
  pool.shutdown
  pool.wait_for_termination(@termination_timeout)
  reset_pool!
end

#subscribe!(index_name:) ⇒ Object



39
40
41
42
43
44
45
# File 'lib/core/jobs/consumer.rb', line 39

def subscribe!(index_name:)
  Utility::Logger.info("Starting a new consumer for #{index_name} index")

  @index_name = index_name
  start_timer_task!
  start_thread_pool!
end