Class: LogStash::Util::WrappedAckedQueue

Inherits:
Object
  • Object
show all
Defined in:
lib/logstash/util/wrapped_acked_queue.rb

Overview

Some specialized constructors. The calling code does need to know what kind it creates but not the internal implementation e.g. LogStash::AckedMemoryQueue etc. Note the use of allocate - this is what new does before it calls initialize. Note that the new method has been made private this is because there is no default queue implementation. It would be expensive to create a persistent queue in the new method to then throw it away in favor of a memory based one directly after. Especially in terms of (mmap) memory allocation and proper close sequencing.

Defined Under Namespace

Classes: NotImplementedError, QueueClosedError, ReadBatch, ReadClient, WriteBatch, WriteClient

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#queueObject (readonly)

Returns the value of attribute queue.



36
37
38
# File 'lib/logstash/util/wrapped_acked_queue.rb', line 36

def queue
  @queue
end

Class Method Details

.create_file_based(path, capacity, max_events, checkpoint_max_writes, checkpoint_max_acks, checkpoint_max_interval, max_bytes) ⇒ Object



28
29
30
31
32
# File 'lib/logstash/util/wrapped_acked_queue.rb', line 28

def self.create_file_based(path, capacity, max_events, checkpoint_max_writes, checkpoint_max_acks, checkpoint_max_interval, max_bytes)
  self.allocate.with_queue(
    LogStash::AckedQueue.new(path, capacity, max_events, checkpoint_max_writes, checkpoint_max_acks, checkpoint_max_interval, max_bytes)
  )
end

.create_memory_based(path, capacity, max_events, max_bytes) ⇒ Object



22
23
24
25
26
# File 'lib/logstash/util/wrapped_acked_queue.rb', line 22

def self.create_memory_based(path, capacity, max_events, max_bytes)
  self.allocate.with_queue(
    LogStash::AckedMemoryQueue.new(path, capacity, max_events, max_bytes)
  )
end

Instance Method Details

#check_closed(action) ⇒ Object



96
97
98
99
100
# File 'lib/logstash/util/wrapped_acked_queue.rb', line 96

def check_closed(action)
  if closed?
    raise QueueClosedError.new("Attempted to #{action} on a closed AckedQueue")
  end
end

#closeObject



102
103
104
105
# File 'lib/logstash/util/wrapped_acked_queue.rb', line 102

def close
  @queue.close
  @closed.make_true
end

#closed?Boolean

Returns:

  • (Boolean)


45
46
47
# File 'lib/logstash/util/wrapped_acked_queue.rb', line 45

def closed?
  @closed.true?
end

#offer(obj, timeout_ms) ⇒ Boolean

TODO - fix doc for this noop method Offer an object to the queue, wait for the specified amount of time. If adding to the queue was successful it will return true, false otherwise.

Parameters:

  • Object (Object)

    to add to the queue

  • Time (Integer)

    in milliseconds to wait before giving up

Returns:

  • (Boolean)

    True if adding was successfull if not it return false

Raises:



66
67
68
# File 'lib/logstash/util/wrapped_acked_queue.rb', line 66

def offer(obj, timeout_ms)
  raise NotImplementedError.new("The offer method is not implemented. There is no non blocking write operation yet.")
end

#poll(millis) ⇒ Object

Block for X millis



78
79
80
81
# File 'lib/logstash/util/wrapped_acked_queue.rb', line 78

def poll(millis)
  check_closed("read")
  @queue.read_batch(1, millis).get_elements.first
end

#push(obj) ⇒ Object Also known as: <<

Push an object to the queue if the queue is full it will block until the object can be added to the queue.

Parameters:

  • Object (Object)

    to add to the queue



53
54
55
56
# File 'lib/logstash/util/wrapped_acked_queue.rb', line 53

def push(obj)
  check_closed("write")
  @queue.write(obj)
end

#read_batch(size, wait) ⇒ Object



83
84
85
86
# File 'lib/logstash/util/wrapped_acked_queue.rb', line 83

def read_batch(size, wait)
  check_closed("read a batch")
  @queue.read_batch(size, wait)
end

#read_clientObject



92
93
94
# File 'lib/logstash/util/wrapped_acked_queue.rb', line 92

def read_client()
  ReadClient.new(self)
end

#takeObject

Blocking



71
72
73
74
75
# File 'lib/logstash/util/wrapped_acked_queue.rb', line 71

def take
  check_closed("read a batch")
  # TODO - determine better arbitrary timeout millis
  @queue.read_batch(1, 200).get_elements.first
end

#with_queue(queue) ⇒ Object



38
39
40
41
42
43
# File 'lib/logstash/util/wrapped_acked_queue.rb', line 38

def with_queue(queue)
  @queue = queue
  @queue.open
  @closed = Concurrent::AtomicBoolean.new(false)
  self
end

#write_clientObject



88
89
90
# File 'lib/logstash/util/wrapped_acked_queue.rb', line 88

def write_client
  WriteClient.new(self)
end