Class: Circuitry::Subscriber

Inherits:
Object
  • Object
show all
Includes:
Concerns::Async, Circuitry::Services::SQS
Defined in:
lib/circuitry/subscriber.rb

Constant Summary collapse

DEFAULT_OPTIONS =
{
  lock: true,
  async: false,
  timeout: 15,
  wait_time: 10,
  batch_size: 10
}.freeze
CONNECTION_ERRORS =
[
  Aws::SQS::Errors::ServiceError
].freeze

Instance Attribute Summary collapse

Attributes included from Concerns::Async

#async

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Circuitry::Services::SQS

#sqs

Methods included from Concerns::Async

#async?, included, #process_asynchronously

Constructor Details

#initialize(options = {}) ⇒ Subscriber

Returns a new instance of Subscriber.



29
30
31
32
33
34
35
36
37
38
39
40
# File 'lib/circuitry/subscriber.rb', line 29

def initialize(options = {})
  options = DEFAULT_OPTIONS.merge(options)

  self.subscribed = false
  self.queue = Queue.find(Circuitry.subscriber_config.queue_name).url

  %i[lock async timeout wait_time batch_size].each do |sym|
    send(:"#{sym}=", options[sym])
  end

  trap_signals
end

Instance Attribute Details

#batch_sizeObject

Returns the value of attribute batch_size.



15
16
17
# File 'lib/circuitry/subscriber.rb', line 15

def batch_size
  @batch_size
end

#lockObject

Returns the value of attribute lock.



15
16
17
# File 'lib/circuitry/subscriber.rb', line 15

def lock
  @lock
end

#queueObject

Returns the value of attribute queue.



15
16
17
# File 'lib/circuitry/subscriber.rb', line 15

def queue
  @queue
end

#timeoutObject

Returns the value of attribute timeout.



15
16
17
# File 'lib/circuitry/subscriber.rb', line 15

def timeout
  @timeout
end

#wait_timeObject

Returns the value of attribute wait_time.



15
16
17
# File 'lib/circuitry/subscriber.rb', line 15

def wait_time
  @wait_time
end

Class Method Details

.async_strategiesObject



62
63
64
# File 'lib/circuitry/subscriber.rb', line 62

def self.async_strategies
  super - [:batch]
end

.default_async_strategyObject



66
67
68
# File 'lib/circuitry/subscriber.rb', line 66

def self.default_async_strategy
  Circuitry.subscriber_config.async_strategy
end

Instance Method Details

#subscribe(&block) ⇒ Object



42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
# File 'lib/circuitry/subscriber.rb', line 42

def subscribe(&block)
  raise ArgumentError, 'block required' if block.nil?
  raise SubscribeError, 'AWS configuration is not set' unless can_subscribe?

  logger.info("Subscribing to queue: #{queue}")

  self.subscribed = true
  poll(&block)
  self.subscribed = false

  logger.info("Unsubscribed from queue: #{queue}")
rescue *CONNECTION_ERRORS => e
  logger.error("Connection error to queue: #{queue}: #{e}")
  raise SubscribeError, e.message
end

#subscribed?Boolean

Returns:

  • (Boolean)


58
59
60
# File 'lib/circuitry/subscriber.rb', line 58

def subscribed?
  subscribed
end