Class: Fare::Subscriber

Inherits:
Object
  • Object
show all
Defined in:
lib/fare/subscriber.rb

Constant Summary collapse

UnknownSubscriber =
Class.new(ArgumentError)
BUSY_MUTEX =
Mutex.new

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(configuration, options = {}) ⇒ Subscriber

Returns a new instance of Subscriber.



12
13
14
15
16
17
18
19
20
21
22
# File 'lib/fare/subscriber.rb', line 12

def initialize(configuration, options = {})
  @configuration = configuration
  @name = (options[:name] || configuration.app_name).to_s
  subscriber_config = configuration.fetch_subscriber(@name)
  subscriber_config.load_setup
  @sqs_queue = configuration.fetch_subscriber_queue(@name)
  @stacks = subscriber_config.stacks
  @concurrency = options.fetch(:concurrency, 1)
  @busy = 0
  set_program_name
end

Instance Attribute Details

#configurationObject (readonly)

Returns the value of attribute configuration.



10
11
12
# File 'lib/fare/subscriber.rb', line 10

def configuration
  @configuration
end

Instance Method Details

#consume(message) ⇒ Object



33
34
35
36
37
38
39
40
41
42
43
44
# File 'lib/fare/subscriber.rb', line 33

def consume(message)
  increment_busy
  event = Event.deserialize(message.body)
  @stacks.each do |stack|
    if stack.handles?(event)
      stack.to_app.call(event: event)
    end
  end
  message.delete
ensure
  decrement_busy
end

#produce(queue) ⇒ Object



24
25
26
27
28
29
30
31
# File 'lib/fare/subscriber.rb', line 24

def produce(queue)
  message = @sqs_queue.receive_message(attributes: [:all])
  if message
    queue << message
  end
rescue AWS::SQS::Errors::InternalError
  sleep 1
end