Class: GetaroundUtils::Utils::AsyncQueue
- Inherits:
-
Object
- Object
- GetaroundUtils::Utils::AsyncQueue
show all
- Includes:
- Mixins::Loggable
- Defined in:
- lib/getaround_utils/utils/async_queue.rb
Constant Summary
collapse
- MAX_QUEUE_SIZE =
1000
- BUFFER_SIZE =
50
Instance Method Summary
collapse
#base_append_infos_to_loggable, #base_loggable_logger, #class_name, #loggable, #loggable_log
Constructor Details
Returns a new instance of AsyncQueue.
10
11
12
13
14
15
16
|
# File 'lib/getaround_utils/utils/async_queue.rb', line 10
def initialize
@queue = []
@mutex = Mutex.new
@closed = false
@worker = Thread.new(&method(:thread_run))
at_exit { terminate }
end
|
Instance Method Details
18
19
20
|
# File 'lib/getaround_utils/utils/async_queue.rb', line 18
def perform
raise NotImplementedError
end
|
#push(payload) ⇒ Object
22
23
24
25
26
27
28
29
30
|
# File 'lib/getaround_utils/utils/async_queue.rb', line 22
def push(payload)
@mutex.synchronize do
if @queue.size >= MAX_QUEUE_SIZE
loggable_log(:error, 'queue overflow')
else
@queue.push(payload)
end
end
end
|
#terminate ⇒ Object
45
46
47
48
|
# File 'lib/getaround_utils/utils/async_queue.rb', line 45
def terminate
@mutex.synchronize { @closed = true }
@worker&.join
end
|
#thread_run ⇒ Object
32
33
34
35
36
37
38
39
40
41
42
43
|
# File 'lib/getaround_utils/utils/async_queue.rb', line 32
def thread_run
loop do
buffer = @mutex.synchronize { @queue.shift(BUFFER_SIZE) }
loggable_log(:debug, 'thread_run', buffer_size: buffer.size)
return if @closed && buffer.empty?
perform(buffer) unless buffer.empty?
sleep(1) unless @mutex.synchronize { @queue.any? }
rescue StandardError => e
loggable_log(:error, e.message, class: e.class.to_s, backtrace: e.backtrace)
end
end
|