Class: ActivePublisher::Async::InMemoryAdapter::AsyncQueue

Inherits:
Object
  • Object
show all
Includes:
Logging
Defined in:
lib/active_publisher/async/in_memory_adapter.rb

Constant Summary collapse

NETWORK_ERRORS =
[::Bunny::Exception, ::Timeout::Error, ::IOError].freeze

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Logging

initialize_logger, logger, #logger, logger=

Constructor Details

#initialize(drop_messages_when_queue_full, max_queue_size, supervisor_interval) ⇒ AsyncQueue

Returns a new instance of AsyncQueue.



54
55
56
57
58
59
60
# File 'lib/active_publisher/async/in_memory_adapter.rb', line 54

def initialize(drop_messages_when_queue_full, max_queue_size, supervisor_interval)
  @drop_messages_when_queue_full = drop_messages_when_queue_full
  @max_queue_size = max_queue_size
  @supervisor_interval = supervisor_interval
  @queue = ::Queue.new
  create_and_supervise_consumer!
end

Instance Attribute Details

#consumerObject (readonly)

Returns the value of attribute consumer.



46
47
48
# File 'lib/active_publisher/async/in_memory_adapter.rb', line 46

def consumer
  @consumer
end

#drop_messages_when_queue_fullObject

Returns the value of attribute drop_messages_when_queue_full.



42
43
44
# File 'lib/active_publisher/async/in_memory_adapter.rb', line 42

def drop_messages_when_queue_full
  @drop_messages_when_queue_full
end

#max_queue_sizeObject

Returns the value of attribute max_queue_size.



42
43
44
# File 'lib/active_publisher/async/in_memory_adapter.rb', line 42

def max_queue_size
  @max_queue_size
end

#queueObject (readonly)

Returns the value of attribute queue.



46
47
48
# File 'lib/active_publisher/async/in_memory_adapter.rb', line 46

def queue
  @queue
end

#supervisorObject (readonly)

Returns the value of attribute supervisor.



46
47
48
# File 'lib/active_publisher/async/in_memory_adapter.rb', line 46

def supervisor
  @supervisor
end

#supervisor_intervalObject

Returns the value of attribute supervisor_interval.



42
43
44
# File 'lib/active_publisher/async/in_memory_adapter.rb', line 42

def supervisor_interval
  @supervisor_interval
end

Instance Method Details

#push(message) ⇒ Object



62
63
64
65
66
67
68
69
70
71
72
73
# File 'lib/active_publisher/async/in_memory_adapter.rb', line 62

def push(message)
  # default of 1_000_000 messages
  if queue.size > max_queue_size
    # Drop messages if the queue is full and we were configured to do so
    return if drop_messages_when_queue_full

    # By default we will raise an error to push the responsibility onto the caller
    fail ::ActivePublisher::Async::InMemoryAdapter::UnableToPersistMessageError, "Queue is full, messages will be dropped."
  end

  queue.push(message)
end

#sizeObject



75
76
77
# File 'lib/active_publisher/async/in_memory_adapter.rb', line 75

def size
  queue.size
end