Class: Fare::Subscriber
- Inherits:
-
Object
- Object
- Fare::Subscriber
- Defined in:
- lib/fare/subscriber.rb
Constant Summary collapse
- UnknownSubscriber =
Class.new(ArgumentError)
- BUSY_MUTEX =
Mutex.new
Instance Attribute Summary collapse
-
#configuration ⇒ Object
readonly
Returns the value of attribute configuration.
Instance Method Summary collapse
- #consume(message) ⇒ Object
-
#initialize(configuration, options = {}) ⇒ Subscriber
constructor
A new instance of Subscriber.
- #produce(queue) ⇒ Object
Constructor Details
#initialize(configuration, options = {}) ⇒ Subscriber
Returns a new instance of Subscriber.
12 13 14 15 16 17 18 19 20 21 22 |
# File 'lib/fare/subscriber.rb', line 12 def initialize(configuration, = {}) @configuration = configuration @name = ([:name] || configuration.app_name).to_s subscriber_config = configuration.fetch_subscriber(@name) subscriber_config.load_setup @sqs_queue = configuration.fetch_subscriber_queue(@name) @stacks = subscriber_config.stacks @concurrency = .fetch(:concurrency, 1) @busy = 0 set_program_name end |
Instance Attribute Details
#configuration ⇒ Object (readonly)
Returns the value of attribute configuration.
10 11 12 |
# File 'lib/fare/subscriber.rb', line 10 def configuration @configuration end |
Instance Method Details
#consume(message) ⇒ Object
33 34 35 36 37 38 39 40 41 42 43 44 |
# File 'lib/fare/subscriber.rb', line 33 def consume() increment_busy event = Event.deserialize(.body) @stacks.each do |stack| if stack.handles?(event) stack.to_app.call(event: event) end end .delete ensure decrement_busy end |
#produce(queue) ⇒ Object
24 25 26 27 28 29 30 31 |
# File 'lib/fare/subscriber.rb', line 24 def produce(queue) = @sqs_queue.(attributes: [:all]) if queue << end rescue AWS::SQS::Errors::InternalError sleep 1 end |