Class: AwsSqsMoniter::WorkerRegistries::TypedMessageRegistry
- Inherits:
-
Shoryuken::WorkerRegistry
- Object
- Shoryuken::WorkerRegistry
- AwsSqsMoniter::WorkerRegistries::TypedMessageRegistry
- Defined in:
- lib/aws_sqs_moniter/worker_registries/typed_message_registry.rb
Constant Summary collapse
- DuplicateSubscriptionError =
Class.new StandardError
- InvalidWorkerOptionsError =
Class.new StandardError
- UnroutableMessageError =
Class.new StandardError
- WorkerNotFoundError =
Class.new StandardError
Instance Method Summary collapse
- #batch_receive_messages?(_queue) ⇒ Boolean
- #clear ⇒ Object
- #fetch_worker(queue, message) ⇒ Object
-
#initialize(dead_letter_queue_name) ⇒ TypedMessageRegistry
constructor
A new instance of TypedMessageRegistry.
- #queues ⇒ Object
- #register_worker(queue, clazz) ⇒ Object
- #topics(queue = nil) ⇒ Object
- #workers(queue) ⇒ Object
Constructor Details
#initialize(dead_letter_queue_name) ⇒ TypedMessageRegistry
Returns a new instance of TypedMessageRegistry.
12 13 14 15 |
# File 'lib/aws_sqs_moniter/worker_registries/typed_message_registry.rb', line 12 def initialize(dead_letter_queue_name) @subscriptions = {} @dead_letter_queue_name = dead_letter_queue_name end |
Instance Method Details
#batch_receive_messages?(_queue) ⇒ Boolean
17 18 19 |
# File 'lib/aws_sqs_moniter/worker_registries/typed_message_registry.rb', line 17 def (_queue) false end |
#clear ⇒ Object
21 22 23 |
# File 'lib/aws_sqs_moniter/worker_registries/typed_message_registry.rb', line 21 def clear @subscriptions.clear end |
#fetch_worker(queue, message) ⇒ Object
25 26 27 28 29 30 31 32 33 34 35 36 37 |
# File 'lib/aws_sqs_moniter/worker_registries/typed_message_registry.rb', line 25 def fetch_worker(queue, ) queue_subscriptions = @subscriptions.fetch(queue) do fail UnroutableMessageError, "#{self} does not know how to route messages for queue '#{queue}'." end = TypedMessage.new worker_class = queue_subscriptions[.type] || queue_subscriptions['*'] fail WorkerNotFoundError, "Worker not found for message type #{.type} on queue #{queue}." unless worker_class worker_class.new end |
#queues ⇒ Object
39 40 41 |
# File 'lib/aws_sqs_moniter/worker_registries/typed_message_registry.rb', line 39 def queues @subscriptions.keys end |
#register_worker(queue, clazz) ⇒ Object
43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 |
# File 'lib/aws_sqs_moniter/worker_registries/typed_message_registry.rb', line 43 def register_worker(queue, clazz) invalid_worker = workers(queue).find do |worker_class| worker_class.['batch'] == true || clazz.['batch'] == true end fail ArgumentError, "Could not register #{clazz} for '#{queue}', "\ "because #{invalid_worker} is already registered for this queue, "\ "and Shoryuken doesn't support a batchable worker for a queue with multiple workers" if invalid_worker worker_subscriptions = clazz.['subscriptions'] if worker_subscriptions.nil? fail InvalidWorkerOptionsError, "Worker #{clazz} must define "\ 'a :subscriptions hash ({ queue_name: message_types }) in '\ "it's shoryuken_options" end worker_subscriptions.each do |owning_queue, | queue_subscriptions = @subscriptions.fetch_store env_name(owning_queue), {} [].flatten.each do || = .to_s.dasherize if queue_subscriptions.key? fail DuplicateSubscriptionError, "Worker #{clazz} cannot "\ "define another subscription of message #{} "\ "on queue #{owning_queue} as it is already subscribed to by "\ "worker #{queue_subscriptions[]}." end queue_subscriptions.store , clazz end end end |
#topics(queue = nil) ⇒ Object
78 79 80 81 82 83 84 85 86 |
# File 'lib/aws_sqs_moniter/worker_registries/typed_message_registry.rb', line 78 def topics queue = nil topic_list = if queue @subscriptions.fetch(queue, {}).keys else @subscriptions.values.map(&:keys).flatten end topic_list.uniq - ['*'] end |
#workers(queue) ⇒ Object
88 89 90 |
# File 'lib/aws_sqs_moniter/worker_registries/typed_message_registry.rb', line 88 def workers(queue) @subscriptions.fetch(queue, {}).values end |