Class: LogStash::Util::WrappedAckedQueue

Inherits:
Object
  • Object
show all
Defined in:
lib/logstash/util/wrapped_acked_queue.rb

Overview

Some specialized constructors. The calling code does need to know what kind it creates but not the internal implementation e.g. LogStash::AckedMemoryQueue etc. Note the use of allocate - this is what new does before it calls initialize. Note that the new method has been made private this is because there is no default queue implementation. It would be expensive to create a persistent queue in the new method to then throw it away in favor of a memory based one directly after. Especially in terms of (mmap) memory allocation and proper close sequencing.

Defined Under Namespace

Classes: NotImplementedError, QueueClosedError, ReadBatch, ReadClient, WriteBatch, WriteClient

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#queueObject (readonly)

Returns the value of attribute queue.



37
38
39
# File 'lib/logstash/util/wrapped_acked_queue.rb', line 37

def queue
  @queue
end

Class Method Details

.create_file_based(path, capacity, max_events, checkpoint_max_writes, checkpoint_max_acks, checkpoint_max_interval, max_bytes) ⇒ Object



29
30
31
32
33
# File 'lib/logstash/util/wrapped_acked_queue.rb', line 29

def self.create_file_based(path, capacity, max_events, checkpoint_max_writes, checkpoint_max_acks, checkpoint_max_interval, max_bytes)
  self.allocate.with_queue(
    LogStash::AckedQueue.new(path, capacity, max_events, checkpoint_max_writes, checkpoint_max_acks, checkpoint_max_interval, max_bytes)
  )
end

.create_memory_based(path, capacity, max_events, max_bytes) ⇒ Object



23
24
25
26
27
# File 'lib/logstash/util/wrapped_acked_queue.rb', line 23

def self.create_memory_based(path, capacity, max_events, max_bytes)
  self.allocate.with_queue(
    LogStash::AckedMemoryQueue.new(path, capacity, max_events, max_bytes)
  )
end

Instance Method Details

#check_closed(action) ⇒ Object



79
80
81
82
83
# File 'lib/logstash/util/wrapped_acked_queue.rb', line 79

def check_closed(action)
  if closed?
    raise QueueClosedError.new("Attempted to #{action} on a closed AckedQueue")
  end
end

#closeObject



89
90
91
92
# File 'lib/logstash/util/wrapped_acked_queue.rb', line 89

def close
  @queue.close
  @closed.make_true
end

#closed?Boolean

Returns:

  • (Boolean)


46
47
48
# File 'lib/logstash/util/wrapped_acked_queue.rb', line 46

def closed?
  @closed.true?
end

#is_empty?Boolean

Returns:

  • (Boolean)


85
86
87
# File 'lib/logstash/util/wrapped_acked_queue.rb', line 85

def is_empty?
  @queue.is_empty?
end

#poll(millis) ⇒ Object

Block for X millis



61
62
63
64
# File 'lib/logstash/util/wrapped_acked_queue.rb', line 61

def poll(millis)
  check_closed("read")
  @queue.read_batch(1, millis).get_elements.first
end

#push(obj) ⇒ Object Also known as: <<

Push an object to the queue if the queue is full it will block until the object can be added to the queue.

Parameters:

  • Object (Object)

    to add to the queue



54
55
56
57
# File 'lib/logstash/util/wrapped_acked_queue.rb', line 54

def push(obj)
  check_closed("write")
  @queue.write(obj)
end

#read_batch(size, wait) ⇒ Object



66
67
68
69
# File 'lib/logstash/util/wrapped_acked_queue.rb', line 66

def read_batch(size, wait)
  check_closed("read a batch")
  @queue.read_batch(size, wait)
end

#read_clientObject



75
76
77
# File 'lib/logstash/util/wrapped_acked_queue.rb', line 75

def read_client()
  ReadClient.new(self)
end

#with_queue(queue) ⇒ Object



39
40
41
42
43
44
# File 'lib/logstash/util/wrapped_acked_queue.rb', line 39

def with_queue(queue)
  @queue = queue
  @queue.open
  @closed = Concurrent::AtomicBoolean.new(false)
  self
end

#write_clientObject



71
72
73
# File 'lib/logstash/util/wrapped_acked_queue.rb', line 71

def write_client
  WriteClient.new(self)
end