Class: LogStash::Util::WrappedAckedQueue

Inherits:
Object
  • Object
show all
Defined in:
lib/logstash/util/wrapped_acked_queue.rb

Overview

Some specialized constructors. The calling code does need to know what kind it creates but not the internal implementation e.g. LogStash::AckedMemoryQueue etc. Note the use of allocate - this is what new does before it calls initialize. Note that the new method has been made private this is because there is no default queue implementation. It would be expensive to create a persistent queue in the new method to then throw it away in favor of a memory based one directly after. Especially in terms of (mmap) memory allocation and proper close sequencing.

Defined Under Namespace

Classes: NotImplementedError, QueueClosedError, ReadBatch, ReadClient, WriteBatch, WriteClient

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#queueObject (readonly)

Returns the value of attribute queue.



31
32
33
# File 'lib/logstash/util/wrapped_acked_queue.rb', line 31

def queue
  @queue
end

Class Method Details

.create_file_based(path, capacity, max_events, checkpoint_max_writes, checkpoint_max_acks, checkpoint_max_interval, max_bytes) ⇒ Object



23
24
25
26
27
# File 'lib/logstash/util/wrapped_acked_queue.rb', line 23

def self.create_file_based(path, capacity, max_events, checkpoint_max_writes, checkpoint_max_acks, checkpoint_max_interval, max_bytes)
  self.allocate.with_queue(
    LogStash::AckedQueue.new(path, capacity, max_events, checkpoint_max_writes, checkpoint_max_acks, checkpoint_max_interval, max_bytes)
  )
end

Instance Method Details

#check_closed(action) ⇒ Object



73
74
75
76
77
# File 'lib/logstash/util/wrapped_acked_queue.rb', line 73

def check_closed(action)
  if closed?
    raise QueueClosedError.new("Attempted to #{action} on a closed AckedQueue")
  end
end

#closeObject



83
84
85
86
# File 'lib/logstash/util/wrapped_acked_queue.rb', line 83

def close
  @queue.close
  @closed.make_true
end

#closed?Boolean

Returns:

  • (Boolean)


40
41
42
# File 'lib/logstash/util/wrapped_acked_queue.rb', line 40

def closed?
  @closed.true?
end

#is_empty?Boolean

Returns:

  • (Boolean)


79
80
81
# File 'lib/logstash/util/wrapped_acked_queue.rb', line 79

def is_empty?
  @queue.is_empty?
end

#poll(millis) ⇒ Object

Block for X millis



55
56
57
58
# File 'lib/logstash/util/wrapped_acked_queue.rb', line 55

def poll(millis)
  check_closed("read")
  @queue.read_batch(1, millis).get_elements.first
end

#push(obj) ⇒ Object Also known as: <<

Push an object to the queue if the queue is full it will block until the object can be added to the queue.

Parameters:

  • Object (Object)

    to add to the queue



48
49
50
51
# File 'lib/logstash/util/wrapped_acked_queue.rb', line 48

def push(obj)
  check_closed("write")
  @queue.write(obj)
end

#read_batch(size, wait) ⇒ Object



60
61
62
63
# File 'lib/logstash/util/wrapped_acked_queue.rb', line 60

def read_batch(size, wait)
  check_closed("read a batch")
  @queue.read_batch(size, wait)
end

#read_clientObject



69
70
71
# File 'lib/logstash/util/wrapped_acked_queue.rb', line 69

def read_client()
  ReadClient.new(self)
end

#with_queue(queue) ⇒ Object



33
34
35
36
37
38
# File 'lib/logstash/util/wrapped_acked_queue.rb', line 33

def with_queue(queue)
  @queue = queue
  @queue.open
  @closed = Concurrent::AtomicBoolean.new(false)
  self
end

#write_clientObject



65
66
67
# File 'lib/logstash/util/wrapped_acked_queue.rb', line 65

def write_client
  WriteClient.new(self)
end