Class: Circuitry::Subscriber
- Inherits:
-
Object
- Object
- Circuitry::Subscriber
- Includes:
- Concerns::Async, Circuitry::Services::SQS
- Defined in:
- lib/circuitry/subscriber.rb
Constant Summary collapse
- DEFAULT_OPTIONS =
{ async: false, wait_time: 10, batch_size: 10, }.freeze
- CONNECTION_ERRORS =
[ Excon::Errors::Forbidden, ].freeze
Instance Attribute Summary collapse
-
#batch_size ⇒ Object
readonly
Returns the value of attribute batch_size.
-
#failure_queue ⇒ Object
readonly
Returns the value of attribute failure_queue.
-
#max_retries ⇒ Object
readonly
Returns the value of attribute max_retries.
-
#queue ⇒ Object
readonly
Returns the value of attribute queue.
-
#wait_time ⇒ Object
readonly
Returns the value of attribute wait_time.
Instance Method Summary collapse
- #async? ⇒ Boolean
-
#initialize(queue, options = {}) ⇒ Subscriber
constructor
A new instance of Subscriber.
- #subscribe(&block) ⇒ Object
Methods included from Circuitry::Services::SQS
Methods included from Concerns::Async
#platform_supports_async?, #process_asynchronously
Constructor Details
#initialize(queue, options = {}) ⇒ Subscriber
Returns a new instance of Subscriber.
24 25 26 27 28 29 30 31 32 33 |
# File 'lib/circuitry/subscriber.rb', line 24 def initialize(queue, = {}) raise ArgumentError.new('queue cannot be nil') if queue.nil? = DEFAULT_OPTIONS.merge() @queue = queue @async = !![:async] @wait_time = [:wait_time] @batch_size = [:batch_size] end |
Instance Attribute Details
#batch_size ⇒ Object (readonly)
Returns the value of attribute batch_size.
12 13 14 |
# File 'lib/circuitry/subscriber.rb', line 12 def batch_size @batch_size end |
#failure_queue ⇒ Object (readonly)
Returns the value of attribute failure_queue.
12 13 14 |
# File 'lib/circuitry/subscriber.rb', line 12 def failure_queue @failure_queue end |
#max_retries ⇒ Object (readonly)
Returns the value of attribute max_retries.
12 13 14 |
# File 'lib/circuitry/subscriber.rb', line 12 def max_retries @max_retries end |
#queue ⇒ Object (readonly)
Returns the value of attribute queue.
12 13 14 |
# File 'lib/circuitry/subscriber.rb', line 12 def queue @queue end |
#wait_time ⇒ Object (readonly)
Returns the value of attribute wait_time.
12 13 14 |
# File 'lib/circuitry/subscriber.rb', line 12 def wait_time @wait_time end |
Instance Method Details
#async? ⇒ Boolean
61 62 63 |
# File 'lib/circuitry/subscriber.rb', line 61 def async? @async end |
#subscribe(&block) ⇒ Object
35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 |
# File 'lib/circuitry/subscriber.rb', line 35 def subscribe(&block) raise ArgumentError.new('block required') if block.nil? unless can_subscribe? logger.warn('Circuitry unable to subscribe: AWS configuration is not set.') return end process = -> do loop do begin (&block) rescue *CONNECTION_ERRORS => e logger.error "Connection error to #{queue}: #{e}" raise SubscribeError.new(e) end end end if async? process_asynchronously(&process) else process.call end end |