Class: ActivePublisher::Async::InMemoryAdapter::AsyncQueue
- Inherits:
-
Object
- Object
- ActivePublisher::Async::InMemoryAdapter::AsyncQueue
- Includes:
- Logging
- Defined in:
- lib/active_publisher/async/in_memory_adapter.rb
Constant Summary collapse
- NETWORK_ERRORS =
[::Bunny::Exception, ::Timeout::Error, ::IOError].freeze
Instance Attribute Summary collapse
-
#consumer ⇒ Object
readonly
Returns the value of attribute consumer.
-
#drop_messages_when_queue_full ⇒ Object
Returns the value of attribute drop_messages_when_queue_full.
-
#max_queue_size ⇒ Object
Returns the value of attribute max_queue_size.
-
#queue ⇒ Object
readonly
Returns the value of attribute queue.
-
#supervisor ⇒ Object
readonly
Returns the value of attribute supervisor.
-
#supervisor_interval ⇒ Object
Returns the value of attribute supervisor_interval.
Instance Method Summary collapse
-
#initialize(drop_messages_when_queue_full, max_queue_size, supervisor_interval) ⇒ AsyncQueue
constructor
A new instance of AsyncQueue.
- #push(message) ⇒ Object
- #size ⇒ Object
Methods included from Logging
initialize_logger, logger, #logger, logger=
Constructor Details
#initialize(drop_messages_when_queue_full, max_queue_size, supervisor_interval) ⇒ AsyncQueue
Returns a new instance of AsyncQueue.
54 55 56 57 58 59 60 |
# File 'lib/active_publisher/async/in_memory_adapter.rb', line 54 def initialize(, max_queue_size, supervisor_interval) @drop_messages_when_queue_full = @max_queue_size = max_queue_size @supervisor_interval = supervisor_interval @queue = ::Queue.new create_and_supervise_consumer! end |
Instance Attribute Details
#consumer ⇒ Object (readonly)
Returns the value of attribute consumer.
46 47 48 |
# File 'lib/active_publisher/async/in_memory_adapter.rb', line 46 def consumer @consumer end |
#drop_messages_when_queue_full ⇒ Object
Returns the value of attribute drop_messages_when_queue_full.
42 43 44 |
# File 'lib/active_publisher/async/in_memory_adapter.rb', line 42 def @drop_messages_when_queue_full end |
#max_queue_size ⇒ Object
Returns the value of attribute max_queue_size.
42 43 44 |
# File 'lib/active_publisher/async/in_memory_adapter.rb', line 42 def max_queue_size @max_queue_size end |
#queue ⇒ Object (readonly)
Returns the value of attribute queue.
46 47 48 |
# File 'lib/active_publisher/async/in_memory_adapter.rb', line 46 def queue @queue end |
#supervisor ⇒ Object (readonly)
Returns the value of attribute supervisor.
46 47 48 |
# File 'lib/active_publisher/async/in_memory_adapter.rb', line 46 def supervisor @supervisor end |
#supervisor_interval ⇒ Object
Returns the value of attribute supervisor_interval.
42 43 44 |
# File 'lib/active_publisher/async/in_memory_adapter.rb', line 42 def supervisor_interval @supervisor_interval end |
Instance Method Details
#push(message) ⇒ Object
62 63 64 65 66 67 68 69 70 71 72 73 |
# File 'lib/active_publisher/async/in_memory_adapter.rb', line 62 def push() # default of 1_000_000 messages if queue.size > max_queue_size # Drop messages if the queue is full and we were configured to do so return if # By default we will raise an error to push the responsibility onto the caller fail ::ActivePublisher::Async::InMemoryAdapter::UnableToPersistMessageError, "Queue is full, messages will be dropped." end queue.push() end |
#size ⇒ Object
75 76 77 |
# File 'lib/active_publisher/async/in_memory_adapter.rb', line 75 def size queue.size end |