Class: Circuitry::Subscriber
- Inherits:
-
Object
- Object
- Circuitry::Subscriber
- Includes:
- Concerns::Async, Circuitry::Services::SQS
- Defined in:
- lib/circuitry/subscriber.rb
Constant Summary collapse
- DEFAULT_OPTIONS =
{ lock: true, async: false, timeout: 15, wait_time: 10, batch_size: 10, }.freeze
- CONNECTION_ERRORS =
[ Excon::Errors::Forbidden, ].freeze
- TEMPORARY_ERRORS =
[ Excon::Errors::InternalServerError, Excon::Errors::ServiceUnavailable, Excon::Errors::SocketError, Excon::Errors::Timeout, ].freeze
Instance Attribute Summary collapse
-
#batch_size ⇒ Object
readonly
Returns the value of attribute batch_size.
-
#lock ⇒ Object
readonly
Returns the value of attribute lock.
-
#queue ⇒ Object
readonly
Returns the value of attribute queue.
-
#timeout ⇒ Object
readonly
Returns the value of attribute timeout.
-
#wait_time ⇒ Object
readonly
Returns the value of attribute wait_time.
Attributes included from Concerns::Async
Class Method Summary collapse
Instance Method Summary collapse
-
#initialize(queue, options = {}) ⇒ Subscriber
constructor
A new instance of Subscriber.
- #subscribe(&block) ⇒ Object
Methods included from Circuitry::Services::SQS
Methods included from Concerns::Async
#async?, included, #process_asynchronously
Constructor Details
#initialize(queue, options = {}) ⇒ Subscriber
Returns a new instance of Subscriber.
34 35 36 37 38 39 40 41 42 43 44 45 |
# File 'lib/circuitry/subscriber.rb', line 34 def initialize(queue, = {}) raise ArgumentError.new('queue cannot be nil') if queue.nil? = DEFAULT_OPTIONS.merge() self.queue = queue self.lock = [:lock] self.async = [:async] self.timeout = [:timeout] self.wait_time = [:wait_time] self.batch_size = [:batch_size] end |
Instance Attribute Details
#batch_size ⇒ Object
Returns the value of attribute batch_size.
13 14 15 |
# File 'lib/circuitry/subscriber.rb', line 13 def batch_size @batch_size end |
#lock ⇒ Object
Returns the value of attribute lock.
13 14 15 |
# File 'lib/circuitry/subscriber.rb', line 13 def lock @lock end |
#queue ⇒ Object
Returns the value of attribute queue.
13 14 15 |
# File 'lib/circuitry/subscriber.rb', line 13 def queue @queue end |
#timeout ⇒ Object
Returns the value of attribute timeout.
13 14 15 |
# File 'lib/circuitry/subscriber.rb', line 13 def timeout @timeout end |
#wait_time ⇒ Object
Returns the value of attribute wait_time.
13 14 15 |
# File 'lib/circuitry/subscriber.rb', line 13 def wait_time @wait_time end |
Class Method Details
.async_strategies ⇒ Object
65 66 67 |
# File 'lib/circuitry/subscriber.rb', line 65 def self.async_strategies super - [:batch] end |
Instance Method Details
#subscribe(&block) ⇒ Object
47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 |
# File 'lib/circuitry/subscriber.rb', line 47 def subscribe(&block) raise ArgumentError.new('block required') if block.nil? unless can_subscribe? logger.warn('Circuitry unable to subscribe: AWS configuration is not set.') return end loop do begin (&block) rescue *CONNECTION_ERRORS => e logger.error("Connection error to #{queue}: #{e}") raise SubscribeError.new(e) end end end |