Class: LogStash::Util::WrappedAckedQueue

Inherits:
Object
  • Object
show all
Defined in:
lib/logstash/util/wrapped_acked_queue.rb

Overview

Some specialized constructors. The calling code does need to know what kind it creates but not the internal implementation e.g. LogStash::AckedMemoryQueue etc. Note the use of allocate - this is what new does before it calls initialize. Note that the new method has been made private this is because there is no default queue implementation. It would be expensive to create a persistent queue in the new method to then throw it away in favor of a memory based one directly after. Especially in terms of (mmap) memory allocation and proper close sequencing.

Defined Under Namespace

Classes: NotImplementedError, QueueClosedError, ReadBatch, ReadClient, WriteClient

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#queueObject (readonly)

Returns the value of attribute queue.



35
36
37
# File 'lib/logstash/util/wrapped_acked_queue.rb', line 35

def queue
  @queue
end

Class Method Details

.create_file_based(path, capacity, max_events, checkpoint_max_writes, checkpoint_max_acks, checkpoint_max_interval, max_bytes) ⇒ Object



27
28
29
30
31
# File 'lib/logstash/util/wrapped_acked_queue.rb', line 27

def self.create_file_based(path, capacity, max_events, checkpoint_max_writes, checkpoint_max_acks, checkpoint_max_interval, max_bytes)
  self.allocate.with_queue(
    LogStash::AckedQueue.new(path, capacity, max_events, checkpoint_max_writes, checkpoint_max_acks, checkpoint_max_interval, max_bytes)
  )
end

.create_memory_based(path, capacity, max_events, max_bytes) ⇒ Object



21
22
23
24
25
# File 'lib/logstash/util/wrapped_acked_queue.rb', line 21

def self.create_memory_based(path, capacity, max_events, max_bytes)
  self.allocate.with_queue(
    LogStash::AckedMemoryQueue.new(path, capacity, max_events, max_bytes)
  )
end

Instance Method Details

#check_closed(action) ⇒ Object



77
78
79
80
81
# File 'lib/logstash/util/wrapped_acked_queue.rb', line 77

def check_closed(action)
  if closed?
    raise QueueClosedError.new("Attempted to #{action} on a closed AckedQueue")
  end
end

#closeObject



87
88
89
90
# File 'lib/logstash/util/wrapped_acked_queue.rb', line 87

def close
  @queue.close
  @closed.make_true
end

#closed?Boolean

Returns:

  • (Boolean)


44
45
46
# File 'lib/logstash/util/wrapped_acked_queue.rb', line 44

def closed?
  @closed.true?
end

#is_empty?Boolean

Returns:

  • (Boolean)


83
84
85
# File 'lib/logstash/util/wrapped_acked_queue.rb', line 83

def is_empty?
  @queue.is_empty?
end

#poll(millis) ⇒ Object

Block for X millis



59
60
61
62
# File 'lib/logstash/util/wrapped_acked_queue.rb', line 59

def poll(millis)
  check_closed("read")
  @queue.read_batch(1, millis).get_elements.first
end

#push(obj) ⇒ Object Also known as: <<

Push an object to the queue if the queue is full it will block until the object can be added to the queue.

Parameters:

  • Object (Object)

    to add to the queue



52
53
54
55
# File 'lib/logstash/util/wrapped_acked_queue.rb', line 52

def push(obj)
  check_closed("write")
  @queue.write(obj)
end

#read_batch(size, wait) ⇒ Object



64
65
66
67
# File 'lib/logstash/util/wrapped_acked_queue.rb', line 64

def read_batch(size, wait)
  check_closed("read a batch")
  @queue.read_batch(size, wait)
end

#read_clientObject



73
74
75
# File 'lib/logstash/util/wrapped_acked_queue.rb', line 73

def read_client()
  ReadClient.new(self)
end

#with_queue(queue) ⇒ Object



37
38
39
40
41
42
# File 'lib/logstash/util/wrapped_acked_queue.rb', line 37

def with_queue(queue)
  @queue = queue
  @queue.open
  @closed = Concurrent::AtomicBoolean.new(false)
  self
end

#write_clientObject



69
70
71
# File 'lib/logstash/util/wrapped_acked_queue.rb', line 69

def write_client
  WriteClient.new(self)
end