Class: AwsSqsMoniter::WorkerRegistries::TypedMessageRegistry

Inherits:
Shoryuken::WorkerRegistry
  • Object
show all
Defined in:
lib/aws_sqs_moniter/worker_registries/typed_message_registry.rb

Constant Summary collapse

DuplicateSubscriptionError =
Class.new StandardError
InvalidWorkerOptionsError =
Class.new StandardError
UnroutableMessageError =
Class.new StandardError
WorkerNotFoundError =
Class.new StandardError

Instance Method Summary collapse

Constructor Details

#initialize(dead_letter_queue_name) ⇒ TypedMessageRegistry

Returns a new instance of TypedMessageRegistry.



12
13
14
15
# File 'lib/aws_sqs_moniter/worker_registries/typed_message_registry.rb', line 12

def initialize(dead_letter_queue_name)
  @subscriptions = {}
  @dead_letter_queue_name = dead_letter_queue_name
end

Instance Method Details

#batch_receive_messages?(_queue) ⇒ Boolean

Returns:

  • (Boolean)


17
18
19
# File 'lib/aws_sqs_moniter/worker_registries/typed_message_registry.rb', line 17

def batch_receive_messages?(_queue)
  false
end

#clearObject



21
22
23
# File 'lib/aws_sqs_moniter/worker_registries/typed_message_registry.rb', line 21

def clear
  @subscriptions.clear
end

#fetch_worker(queue, message) ⇒ Object



25
26
27
28
29
30
31
32
33
34
35
36
37
# File 'lib/aws_sqs_moniter/worker_registries/typed_message_registry.rb', line 25

def fetch_worker(queue, message)
  queue_subscriptions = @subscriptions.fetch(queue) do
    fail UnroutableMessageError, "#{self} does not know how to route messages for queue '#{queue}'."
  end

  message = TypedMessage.new message
  worker_class = queue_subscriptions[message.type] || queue_subscriptions['*']

  fail WorkerNotFoundError,
       "Worker not found for message type #{message.type} on queue #{queue}." unless worker_class

  worker_class.new
end

#queuesObject



39
40
41
# File 'lib/aws_sqs_moniter/worker_registries/typed_message_registry.rb', line 39

def queues
  @subscriptions.keys
end

#register_worker(queue, clazz) ⇒ Object



43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
# File 'lib/aws_sqs_moniter/worker_registries/typed_message_registry.rb', line 43

def register_worker(queue, clazz)
  invalid_worker = workers(queue).find do |worker_class|
    worker_class.get_shoryuken_options['batch'] == true || clazz.get_shoryuken_options['batch'] == true
  end

  fail ArgumentError, "Could not register #{clazz} for '#{queue}', "\
    "because #{invalid_worker} is already registered for this queue, "\
    "and Shoryuken doesn't support a batchable worker for a queue with multiple workers" if invalid_worker

  worker_subscriptions = clazz.get_shoryuken_options['subscriptions']

  if worker_subscriptions.nil?
    fail InvalidWorkerOptionsError, "Worker #{clazz} must define "\
      'a :subscriptions hash ({ queue_name: message_types }) in '\
      "it's shoryuken_options"
  end

  worker_subscriptions.each do |owning_queue, message_types|
    queue_subscriptions = @subscriptions.fetch_store env_name(owning_queue), {}

    [message_types].flatten.each do |message_type|
      message_type = message_type.to_s.dasherize

      if queue_subscriptions.key? message_type
        fail DuplicateSubscriptionError, "Worker #{clazz} cannot "\
          "define another subscription of message #{message_type} "\
          "on queue #{owning_queue} as it is already subscribed to by "\
          "worker #{queue_subscriptions[message_type]}."
      end

      queue_subscriptions.store message_type, clazz
    end
  end
end

#topics(queue = nil) ⇒ Object



78
79
80
81
82
83
84
85
86
# File 'lib/aws_sqs_moniter/worker_registries/typed_message_registry.rb', line 78

def topics queue = nil
  topic_list = if queue
                 @subscriptions.fetch(queue, {}).keys
               else
                 @subscriptions.values.map(&:keys).flatten
               end

  topic_list.uniq - ['*']
end

#workers(queue) ⇒ Object



88
89
90
# File 'lib/aws_sqs_moniter/worker_registries/typed_message_registry.rb', line 88

def workers(queue)
  @subscriptions.fetch(queue, {}).values
end