Class: GetaroundUtils::Utils::AsyncQueue

Inherits:
Object
  • Object
show all
Includes:
Mixins::Loggable
Defined in:
lib/getaround_utils/utils/async_queue.rb

Direct Known Subclasses

CapturReporter

Constant Summary collapse

MAX_QUEUE_SIZE =
1000
BUFFER_SIZE =
50

Instance Method Summary collapse

Methods included from Mixins::Loggable

#base_append_infos_to_loggable, #base_loggable_logger, #class_name, #loggable, #loggable_log

Constructor Details

#initializeAsyncQueue

Returns a new instance of AsyncQueue.



10
11
12
13
14
15
16
# File 'lib/getaround_utils/utils/async_queue.rb', line 10

def initialize
  @queue = []
  @mutex = Mutex.new
  @closed = false
  @worker = Thread.new(&method(:thread_run))
  at_exit { terminate }
end

Instance Method Details

#performObject

Raises:

  • (NotImplementedError)


18
19
20
# File 'lib/getaround_utils/utils/async_queue.rb', line 18

def perform
  raise NotImplementedError
end

#push(payload) ⇒ Object



22
23
24
25
26
27
28
29
30
# File 'lib/getaround_utils/utils/async_queue.rb', line 22

def push(payload)
  @mutex.synchronize do
    if @queue.size >= MAX_QUEUE_SIZE
      loggable_log(:error, 'queue overflow')
    else
      @queue.push(payload)
    end
  end
end

#terminateObject



45
46
47
48
# File 'lib/getaround_utils/utils/async_queue.rb', line 45

def terminate
  @mutex.synchronize { @closed = true }
  @worker&.join
end

#thread_runObject



32
33
34
35
36
37
38
39
40
41
42
43
# File 'lib/getaround_utils/utils/async_queue.rb', line 32

def thread_run
  loop do
    buffer = @mutex.synchronize { @queue.shift(BUFFER_SIZE) }
    loggable_log(:debug, 'thread_run', buffer_size: buffer.size)
    return if @closed && buffer.empty?

    perform(buffer) unless buffer.empty?
    sleep(1) unless @mutex.synchronize { @queue.any? }
  rescue StandardError => e
    loggable_log(:error, e.message, class: e.class.to_s, backtrace: e.backtrace)
  end
end