Class: LogStash::Util::WrappedAckedQueue
  
  
  
  
  
    - Inherits:
- 
      Object
      
        
          - Object
- LogStash::Util::WrappedAckedQueue
 show all
    - Defined in:
- lib/logstash/util/wrapped_acked_queue.rb
 
Overview
  
    
Some specialized constructors. The calling code does need to know what kind it creates but not the internal implementation e.g. LogStash::AckedMemoryQueue etc. Note the use of allocate - this is what new does before it calls initialize. Note that the new method has been made private this is because there is no default queue implementation. It would be expensive to create a persistent queue in the new method to then throw it away in favor of a memory based one directly after. Especially in terms of (mmap) memory allocation and proper close sequencing.
   
 
  
Defined Under Namespace
  
    
  
    
      Classes: NotImplementedError, QueueClosedError, ReadBatch, ReadClient, WriteBatch, WriteClient
    
  
  Instance Attribute Summary collapse
  
  
    
      Class Method Summary
      collapse
    
    
  
    
      Instance Method Summary
      collapse
    
    
  
  
    Instance Attribute Details
    
      
      
      
  
  
    #queue  ⇒ Object  
  
  
  
  
    
Returns the value of attribute queue.
   
 
  
  
    | 
37
38
39 | # File 'lib/logstash/util/wrapped_acked_queue.rb', line 37
def queue
  @queue
end | 
 
    
   
  
    Class Method Details
    
      
  
  
    .create_file_based(path, capacity, max_events, checkpoint_max_writes, checkpoint_max_acks, checkpoint_max_interval, max_bytes)  ⇒ Object 
  
  
  
  
    | 
29
30
31
32
33 | # File 'lib/logstash/util/wrapped_acked_queue.rb', line 29
def self.create_file_based(path, capacity, max_events, checkpoint_max_writes, checkpoint_max_acks, checkpoint_max_interval, max_bytes)
  self.allocate.with_queue(
    LogStash::AckedQueue.new(path, capacity, max_events, checkpoint_max_writes, checkpoint_max_acks, checkpoint_max_interval, max_bytes)
  )
end | 
 
    
      
  
  
    .create_memory_based(path, capacity, max_events, max_bytes)  ⇒ Object 
  
  
  
  
    | 
23
24
25
26
27 | # File 'lib/logstash/util/wrapped_acked_queue.rb', line 23
def self.create_memory_based(path, capacity, max_events, max_bytes)
  self.allocate.with_queue(
    LogStash::AckedMemoryQueue.new(path, capacity, max_events, max_bytes)
  )
end | 
 
    
   
  
    Instance Method Details
    
      
  
  
    #check_closed(action)  ⇒ Object 
  
  
  
  
    | 
79
80
81
82
83 | # File 'lib/logstash/util/wrapped_acked_queue.rb', line 79
def check_closed(action)
  if closed?
    raise QueueClosedError.new("Attempted to #{action} on a closed AckedQueue")
  end
end | 
 
    
      
  
  
    #close  ⇒ Object 
  
  
  
  
    | 
89
90
91
92 | # File 'lib/logstash/util/wrapped_acked_queue.rb', line 89
def close
  @queue.close
  @closed.make_true
end | 
 
    
      
  
  
    #closed?  ⇒ Boolean 
  
  
  
  
    | 
46
47
48 | # File 'lib/logstash/util/wrapped_acked_queue.rb', line 46
def closed?
  @closed.true?
end | 
 
    
      
  
  
    #is_empty?  ⇒ Boolean 
  
  
  
  
    | 
85
86
87 | # File 'lib/logstash/util/wrapped_acked_queue.rb', line 85
def is_empty?
  @queue.is_empty?
end | 
 
    
      
  
  
    #poll(millis)  ⇒ Object 
  
  
  
  
  
    | 
61
62
63
64 | # File 'lib/logstash/util/wrapped_acked_queue.rb', line 61
def poll(millis)
  check_closed("read")
  @queue.read_batch(1, millis).get_elements.first
end | 
 
    
      
  
  
    #push(obj)  ⇒ Object 
  
  
    Also known as:
    <<
    
  
  
  
    
Push an object to the queue if the queue is full it will block until the object can be added to the queue.
   
 
  
    | 
54
55
56
57 | # File 'lib/logstash/util/wrapped_acked_queue.rb', line 54
def push(obj)
  check_closed("write")
  @queue.write(obj)
end | 
 
    
      
  
  
    #read_batch(size, wait)  ⇒ Object 
  
  
  
  
    | 
66
67
68
69 | # File 'lib/logstash/util/wrapped_acked_queue.rb', line 66
def read_batch(size, wait)
  check_closed("read a batch")
  @queue.read_batch(size, wait)
end | 
 
    
      
  
  
    #read_client  ⇒ Object 
  
  
  
  
    | 
75
76
77 | # File 'lib/logstash/util/wrapped_acked_queue.rb', line 75
def read_client()
  ReadClient.new(self)
end | 
 
    
      
  
  
    #with_queue(queue)  ⇒ Object 
  
  
  
  
    | 
39
40
41
42
43
44 | # File 'lib/logstash/util/wrapped_acked_queue.rb', line 39
def with_queue(queue)
  @queue = queue
  @queue.open
  @closed = Concurrent::AtomicBoolean.new(false)
  self
end | 
 
    
      
  
  
    #write_client  ⇒ Object 
  
  
  
  
    | 
71
72
73 | # File 'lib/logstash/util/wrapped_acked_queue.rb', line 71
def write_client
  WriteClient.new(self)
end |