Class: LogStash::Util::WrappedAckedQueue

Inherits:
Object
  • Object
show all
Defined in:
lib/logstash/util/wrapped_acked_queue.rb

Overview

Some specialized constructors. The calling code does need to know what kind it creates but not the internal implementation e.g. LogStash::AckedMemoryQueue etc. Note the use of allocate - this is what new does before it calls initialize. Note that the new method has been made private this is because there is no default queue implementation. It would be expensive to create a persistent queue in the new method to then throw it away in favor of a memory based one directly after. Especially in terms of (mmap) memory allocation and proper close sequencing.

Defined Under Namespace

Classes: NotImplementedError, QueueClosedError, ReadBatch, ReadClient, WriteBatch, WriteClient

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#queueObject (readonly)

Returns the value of attribute queue.



37
38
39
# File 'lib/logstash/util/wrapped_acked_queue.rb', line 37

def queue
  @queue
end

Class Method Details

.create_file_based(path, capacity, max_events, checkpoint_max_writes, checkpoint_max_acks, checkpoint_max_interval, max_bytes) ⇒ Object



29
30
31
32
33
# File 'lib/logstash/util/wrapped_acked_queue.rb', line 29

def self.create_file_based(path, capacity, max_events, checkpoint_max_writes, checkpoint_max_acks, checkpoint_max_interval, max_bytes)
  self.allocate.with_queue(
    LogStash::AckedQueue.new(path, capacity, max_events, checkpoint_max_writes, checkpoint_max_acks, checkpoint_max_interval, max_bytes)
  )
end

.create_memory_based(path, capacity, max_events, max_bytes) ⇒ Object



23
24
25
26
27
# File 'lib/logstash/util/wrapped_acked_queue.rb', line 23

def self.create_memory_based(path, capacity, max_events, max_bytes)
  self.allocate.with_queue(
    LogStash::AckedMemoryQueue.new(path, capacity, max_events, max_bytes)
  )
end

Instance Method Details

#check_closed(action) ⇒ Object



97
98
99
100
101
# File 'lib/logstash/util/wrapped_acked_queue.rb', line 97

def check_closed(action)
  if closed?
    raise QueueClosedError.new("Attempted to #{action} on a closed AckedQueue")
  end
end

#closeObject



103
104
105
106
# File 'lib/logstash/util/wrapped_acked_queue.rb', line 103

def close
  @queue.close
  @closed.make_true
end

#closed?Boolean

Returns:

  • (Boolean)


46
47
48
# File 'lib/logstash/util/wrapped_acked_queue.rb', line 46

def closed?
  @closed.true?
end

#offer(obj, timeout_ms) ⇒ Boolean

TODO - fix doc for this noop method Offer an object to the queue, wait for the specified amount of time. If adding to the queue was successful it will return true, false otherwise.

Parameters:

  • Object (Object)

    to add to the queue

  • Time (Integer)

    in milliseconds to wait before giving up

Returns:

  • (Boolean)

    True if adding was successful if not it return false

Raises:



67
68
69
# File 'lib/logstash/util/wrapped_acked_queue.rb', line 67

def offer(obj, timeout_ms)
  raise NotImplementedError.new("The offer method is not implemented. There is no non blocking write operation yet.")
end

#poll(millis) ⇒ Object

Block for X millis



79
80
81
82
# File 'lib/logstash/util/wrapped_acked_queue.rb', line 79

def poll(millis)
  check_closed("read")
  @queue.read_batch(1, millis).get_elements.first
end

#push(obj) ⇒ Object Also known as: <<

Push an object to the queue if the queue is full it will block until the object can be added to the queue.

Parameters:

  • Object (Object)

    to add to the queue



54
55
56
57
# File 'lib/logstash/util/wrapped_acked_queue.rb', line 54

def push(obj)
  check_closed("write")
  @queue.write(obj)
end

#read_batch(size, wait) ⇒ Object



84
85
86
87
# File 'lib/logstash/util/wrapped_acked_queue.rb', line 84

def read_batch(size, wait)
  check_closed("read a batch")
  @queue.read_batch(size, wait)
end

#read_clientObject



93
94
95
# File 'lib/logstash/util/wrapped_acked_queue.rb', line 93

def read_client()
  ReadClient.new(self)
end

#takeObject

Blocking



72
73
74
75
76
# File 'lib/logstash/util/wrapped_acked_queue.rb', line 72

def take
  check_closed("read a batch")
  # TODO - determine better arbitrary timeout millis
  @queue.read_batch(1, 200).get_elements.first
end

#with_queue(queue) ⇒ Object



39
40
41
42
43
44
# File 'lib/logstash/util/wrapped_acked_queue.rb', line 39

def with_queue(queue)
  @queue = queue
  @queue.open
  @closed = Concurrent::AtomicBoolean.new(false)
  self
end

#write_clientObject



89
90
91
# File 'lib/logstash/util/wrapped_acked_queue.rb', line 89

def write_client
  WriteClient.new(self)
end