Class: LogStash::Util::WrappedAckedQueue
- Inherits:
-
Object
- Object
- LogStash::Util::WrappedAckedQueue
show all
- Defined in:
- lib/logstash/util/wrapped_acked_queue.rb
Overview
Some specialized constructors. The calling code does need to know what kind it creates but not the internal implementation e.g. LogStash::AckedMemoryQueue etc. Note the use of allocate - this is what new does before it calls initialize. Note that the new method has been made private this is because there is no default queue implementation. It would be expensive to create a persistent queue in the new method to then throw it away in favor of a memory based one directly after. Especially in terms of (mmap) memory allocation and proper close sequencing.
Defined Under Namespace
Classes: NotImplementedError, QueueClosedError, ReadBatch, ReadClient, WriteBatch, WriteClient
Instance Attribute Summary collapse
Class Method Summary
collapse
Instance Method Summary
collapse
Instance Attribute Details
#queue ⇒ Object
Returns the value of attribute queue.
37
38
39
|
# File 'lib/logstash/util/wrapped_acked_queue.rb', line 37
def queue
@queue
end
|
Class Method Details
.create_file_based(path, capacity, max_events, checkpoint_max_writes, checkpoint_max_acks, checkpoint_max_interval, max_bytes) ⇒ Object
29
30
31
32
33
|
# File 'lib/logstash/util/wrapped_acked_queue.rb', line 29
def self.create_file_based(path, capacity, max_events, checkpoint_max_writes, checkpoint_max_acks, checkpoint_max_interval, max_bytes)
self.allocate.with_queue(
LogStash::AckedQueue.new(path, capacity, max_events, checkpoint_max_writes, checkpoint_max_acks, checkpoint_max_interval, max_bytes)
)
end
|
.create_memory_based(path, capacity, max_events, max_bytes) ⇒ Object
23
24
25
26
27
|
# File 'lib/logstash/util/wrapped_acked_queue.rb', line 23
def self.create_memory_based(path, capacity, max_events, max_bytes)
self.allocate.with_queue(
LogStash::AckedMemoryQueue.new(path, capacity, max_events, max_bytes)
)
end
|
Instance Method Details
#check_closed(action) ⇒ Object
97
98
99
100
101
|
# File 'lib/logstash/util/wrapped_acked_queue.rb', line 97
def check_closed(action)
if closed?
raise QueueClosedError.new("Attempted to #{action} on a closed AckedQueue")
end
end
|
#close ⇒ Object
103
104
105
106
|
# File 'lib/logstash/util/wrapped_acked_queue.rb', line 103
def close
@queue.close
@closed.make_true
end
|
#closed? ⇒ Boolean
46
47
48
|
# File 'lib/logstash/util/wrapped_acked_queue.rb', line 46
def closed?
@closed.true?
end
|
#offer(obj, timeout_ms) ⇒ Boolean
TODO - fix doc for this noop method Offer an object to the queue, wait for the specified amount of time. If adding to the queue was successful it will return true, false otherwise.
67
68
69
|
# File 'lib/logstash/util/wrapped_acked_queue.rb', line 67
def offer(obj, timeout_ms)
raise NotImplementedError.new("The offer method is not implemented. There is no non blocking write operation yet.")
end
|
#poll(millis) ⇒ Object
79
80
81
82
|
# File 'lib/logstash/util/wrapped_acked_queue.rb', line 79
def poll(millis)
check_closed("read")
@queue.read_batch(1, millis).get_elements.first
end
|
#push(obj) ⇒ Object
Also known as:
<<
Push an object to the queue if the queue is full it will block until the object can be added to the queue.
54
55
56
57
|
# File 'lib/logstash/util/wrapped_acked_queue.rb', line 54
def push(obj)
check_closed("write")
@queue.write(obj)
end
|
#read_batch(size, wait) ⇒ Object
84
85
86
87
|
# File 'lib/logstash/util/wrapped_acked_queue.rb', line 84
def read_batch(size, wait)
check_closed("read a batch")
@queue.read_batch(size, wait)
end
|
#read_client ⇒ Object
93
94
95
|
# File 'lib/logstash/util/wrapped_acked_queue.rb', line 93
def read_client()
ReadClient.new(self)
end
|
#take ⇒ Object
72
73
74
75
76
|
# File 'lib/logstash/util/wrapped_acked_queue.rb', line 72
def take
check_closed("read a batch")
@queue.read_batch(1, 200).get_elements.first
end
|
#with_queue(queue) ⇒ Object
39
40
41
42
43
44
|
# File 'lib/logstash/util/wrapped_acked_queue.rb', line 39
def with_queue(queue)
@queue = queue
@queue.open
@closed = Concurrent::AtomicBoolean.new(false)
self
end
|
#write_client ⇒ Object
89
90
91
|
# File 'lib/logstash/util/wrapped_acked_queue.rb', line 89
def write_client
WriteClient.new(self)
end
|