Class: LogStash::Util::WrappedAckedQueue
- Inherits:
-
Object
- Object
- LogStash::Util::WrappedAckedQueue
show all
- Defined in:
- lib/logstash/util/wrapped_acked_queue.rb
Overview
Some specialized constructors. The calling code does need to know what kind it creates but not the internal implementation e.g. LogStash::AckedMemoryQueue etc. Note the use of allocate - this is what new does before it calls initialize. Note that the new method has been made private this is because there is no default queue implementation. It would be expensive to create a persistent queue in the new method to then throw it away in favor of a memory based one directly after. Especially in terms of (mmap) memory allocation and proper close sequencing.
Defined Under Namespace
Classes: NotImplementedError, QueueClosedError, ReadBatch, ReadClient, WriteClient
Instance Attribute Summary collapse
Class Method Summary
collapse
Instance Method Summary
collapse
Instance Attribute Details
#queue ⇒ Object
Returns the value of attribute queue.
35
36
37
|
# File 'lib/logstash/util/wrapped_acked_queue.rb', line 35
def queue
@queue
end
|
Class Method Details
.create_file_based(path, capacity, max_events, checkpoint_max_writes, checkpoint_max_acks, checkpoint_max_interval, max_bytes) ⇒ Object
27
28
29
30
31
|
# File 'lib/logstash/util/wrapped_acked_queue.rb', line 27
def self.create_file_based(path, capacity, max_events, checkpoint_max_writes, checkpoint_max_acks, checkpoint_max_interval, max_bytes)
self.allocate.with_queue(
LogStash::AckedQueue.new(path, capacity, max_events, checkpoint_max_writes, checkpoint_max_acks, checkpoint_max_interval, max_bytes)
)
end
|
.create_memory_based(path, capacity, max_events, max_bytes) ⇒ Object
21
22
23
24
25
|
# File 'lib/logstash/util/wrapped_acked_queue.rb', line 21
def self.create_memory_based(path, capacity, max_events, max_bytes)
self.allocate.with_queue(
LogStash::AckedMemoryQueue.new(path, capacity, max_events, max_bytes)
)
end
|
Instance Method Details
#check_closed(action) ⇒ Object
77
78
79
80
81
|
# File 'lib/logstash/util/wrapped_acked_queue.rb', line 77
def check_closed(action)
if closed?
raise QueueClosedError.new("Attempted to #{action} on a closed AckedQueue")
end
end
|
#close ⇒ Object
87
88
89
90
|
# File 'lib/logstash/util/wrapped_acked_queue.rb', line 87
def close
@queue.close
@closed.make_true
end
|
#closed? ⇒ Boolean
44
45
46
|
# File 'lib/logstash/util/wrapped_acked_queue.rb', line 44
def closed?
@closed.true?
end
|
#is_empty? ⇒ Boolean
83
84
85
|
# File 'lib/logstash/util/wrapped_acked_queue.rb', line 83
def is_empty?
@queue.is_empty?
end
|
#poll(millis) ⇒ Object
59
60
61
62
|
# File 'lib/logstash/util/wrapped_acked_queue.rb', line 59
def poll(millis)
check_closed("read")
@queue.read_batch(1, millis).get_elements.first
end
|
#push(obj) ⇒ Object
Also known as:
<<
Push an object to the queue if the queue is full it will block until the object can be added to the queue.
52
53
54
55
|
# File 'lib/logstash/util/wrapped_acked_queue.rb', line 52
def push(obj)
check_closed("write")
@queue.write(obj)
end
|
#read_batch(size, wait) ⇒ Object
64
65
66
67
|
# File 'lib/logstash/util/wrapped_acked_queue.rb', line 64
def read_batch(size, wait)
check_closed("read a batch")
@queue.read_batch(size, wait)
end
|
#read_client ⇒ Object
73
74
75
|
# File 'lib/logstash/util/wrapped_acked_queue.rb', line 73
def read_client()
ReadClient.new(self)
end
|
#with_queue(queue) ⇒ Object
37
38
39
40
41
42
|
# File 'lib/logstash/util/wrapped_acked_queue.rb', line 37
def with_queue(queue)
@queue = queue
@queue.open
@closed = Concurrent::AtomicBoolean.new(false)
self
end
|
#write_client ⇒ Object
69
70
71
|
# File 'lib/logstash/util/wrapped_acked_queue.rb', line 69
def write_client
WriteClient.new(self)
end
|