Class: Circuitry::Subscriber

Inherits:
Object
  • Object
show all
Includes:
Concerns::Async, Circuitry::Services::SQS
Defined in:
lib/circuitry/subscriber.rb

Constant Summary collapse

DEFAULT_OPTIONS =
{
    lock: true,
    async: false,
    timeout: 15,
    wait_time: 10,
    batch_size: 10,
}.freeze
CONNECTION_ERRORS =
[
    Excon::Errors::Forbidden,
].freeze
TEMPORARY_ERRORS =
[
    Excon::Errors::InternalServerError,
    Excon::Errors::ServiceUnavailable,
    Excon::Errors::SocketError,
    Excon::Errors::Timeout,
].freeze

Instance Attribute Summary collapse

Attributes included from Concerns::Async

#async

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Circuitry::Services::SQS

#sqs

Methods included from Concerns::Async

#async?, included, #process_asynchronously

Constructor Details

#initialize(queue, options = {}) ⇒ Subscriber

Returns a new instance of Subscriber.

Raises:

  • (ArgumentError)


34
35
36
37
38
39
40
41
42
43
44
45
# File 'lib/circuitry/subscriber.rb', line 34

def initialize(queue, options = {})
  raise ArgumentError.new('queue cannot be nil') if queue.nil?

  options = DEFAULT_OPTIONS.merge(options)

  self.queue = queue
  self.lock = options[:lock]
  self.async = options[:async]
  self.timeout = options[:timeout]
  self.wait_time = options[:wait_time]
  self.batch_size = options[:batch_size]
end

Instance Attribute Details

#batch_sizeObject

Returns the value of attribute batch_size.



13
14
15
# File 'lib/circuitry/subscriber.rb', line 13

def batch_size
  @batch_size
end

#lockObject

Returns the value of attribute lock.



13
14
15
# File 'lib/circuitry/subscriber.rb', line 13

def lock
  @lock
end

#queueObject

Returns the value of attribute queue.



13
14
15
# File 'lib/circuitry/subscriber.rb', line 13

def queue
  @queue
end

#timeoutObject

Returns the value of attribute timeout.



13
14
15
# File 'lib/circuitry/subscriber.rb', line 13

def timeout
  @timeout
end

#wait_timeObject

Returns the value of attribute wait_time.



13
14
15
# File 'lib/circuitry/subscriber.rb', line 13

def wait_time
  @wait_time
end

Class Method Details

.async_strategiesObject



65
66
67
# File 'lib/circuitry/subscriber.rb', line 65

def self.async_strategies
  super - [:batch]
end

.default_async_strategyObject



69
70
71
# File 'lib/circuitry/subscriber.rb', line 69

def self.default_async_strategy
  Circuitry.config.subscribe_async_strategy
end

Instance Method Details

#subscribe(&block) ⇒ Object

Raises:

  • (ArgumentError)


47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
# File 'lib/circuitry/subscriber.rb', line 47

def subscribe(&block)
  raise ArgumentError.new('block required') if block.nil?

  unless can_subscribe?
    logger.warn('Circuitry unable to subscribe: AWS configuration is not set.')
    return
  end

  loop do
    begin
      receive_messages(&block)
    rescue *CONNECTION_ERRORS => e
      logger.error("Connection error to #{queue}: #{e}")
      raise SubscribeError.new(e)
    end
  end
end