Class: QueueBus::Dispatch
- Inherits:
-
Object
- Object
- QueueBus::Dispatch
- Defined in:
- lib/queue_bus/dispatch.rb
Overview
A Dispatch object can be used to declare an application along with it’s various subscriptions.
Instance Attribute Summary collapse
-
#app_key ⇒ Object
readonly
Returns the value of attribute app_key.
-
#subscriptions ⇒ Object
readonly
Returns the value of attribute subscriptions.
Instance Method Summary collapse
- #add_subscription(queue_name, key, class_name, matcher_hash = nil, block) ⇒ Object
- #dispatch_event(queue, key, matcher_hash, block) ⇒ Object
- #execute(key, attributes) ⇒ Object
-
#initialize(app_key) ⇒ Dispatch
constructor
A new instance of Dispatch.
-
#method_missing(method_name, *args, &block) ⇒ Object
allows definitions of other queues.
-
#on_heartbeat(key, minute: nil, hour: nil, wday: nil, minute_interval: nil, hour_interval: nil, &block) ⇒ Object
rubocop:disable Metrics/PerceivedComplexity, Metrics/MethodLength, Metrics/ParameterLists, Metrics/CyclomaticComplexity, Metrics/AbcSize, Metrics/LineLength.
- #size ⇒ Object
- #subscribe(key, matcher_hash = nil, &block) ⇒ Object
- #subscription_matches(attributes) ⇒ Object
Constructor Details
#initialize(app_key) ⇒ Dispatch
Returns a new instance of Dispatch.
10 11 12 13 |
# File 'lib/queue_bus/dispatch.rb', line 10 def initialize(app_key) @app_key = Application.normalize(app_key) @subscriptions = SubscriptionList.new end |
Dynamic Method Handling
This class handles dynamic methods through the method_missing method
#method_missing(method_name, *args, &block) ⇒ Object
allows definitions of other queues
63 64 65 66 67 68 69 70 71 |
# File 'lib/queue_bus/dispatch.rb', line 63 def method_missing(method_name, *args, &block) if args.size == 1 && block dispatch_event(method_name, args[0], nil, block) elsif args.size == 2 && block dispatch_event(method_name, args[0], args[1], block) else super end end |
Instance Attribute Details
#app_key ⇒ Object (readonly)
Returns the value of attribute app_key.
8 9 10 |
# File 'lib/queue_bus/dispatch.rb', line 8 def app_key @app_key end |
#subscriptions ⇒ Object (readonly)
Returns the value of attribute subscriptions.
8 9 10 |
# File 'lib/queue_bus/dispatch.rb', line 8 def subscriptions @subscriptions end |
Instance Method Details
#add_subscription(queue_name, key, class_name, matcher_hash = nil, block) ⇒ Object
96 97 98 99 100 |
# File 'lib/queue_bus/dispatch.rb', line 96 def add_subscription(queue_name, key, class_name, matcher_hash = nil, block) sub = Subscription.register(queue_name, key, class_name, matcher_hash, block) subscriptions.add(sub) sub end |
#dispatch_event(queue, key, matcher_hash, block) ⇒ Object
90 91 92 93 94 |
# File 'lib/queue_bus/dispatch.rb', line 90 def dispatch_event(queue, key, matcher_hash, block) # if not matcher_hash, assume key is a event_type regex matcher_hash ||= { 'bus_event_type' => key } add_subscription("#{app_key}_#{queue}", key, '::QueueBus::Rider', matcher_hash, block) end |
#execute(key, attributes) ⇒ Object
73 74 75 76 77 78 79 80 |
# File 'lib/queue_bus/dispatch.rb', line 73 def execute(key, attributes) sub = subscriptions.key(key) if sub sub.execute!(attributes) else # TODO: log that it's not there end end |
#on_heartbeat(key, minute: nil, hour: nil, wday: nil, minute_interval: nil, hour_interval: nil, &block) ⇒ Object
rubocop:disable Metrics/PerceivedComplexity, Metrics/MethodLength, Metrics/ParameterLists, Metrics/CyclomaticComplexity, Metrics/AbcSize, Metrics/LineLength
19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 |
# File 'lib/queue_bus/dispatch.rb', line 19 def on_heartbeat(key, minute: nil, hour: nil, wday: nil, minute_interval: nil, hour_interval: nil, &block) # rubocop:disable Metrics/PerceivedComplexity, Metrics/MethodLength, Metrics/ParameterLists, Metrics/CyclomaticComplexity, Metrics/AbcSize, Metrics/LineLength if minute_interval && !minute_interval.positive? raise ArgumentError, 'minute_interval must be a positive integer' end if hour_interval && !hour_interval.positive? raise ArgumentError, 'hour_interval must be a positive integer' end matcher = { bus_event_type: :heartbeat_minutes } if minute raise ArgumentError, 'minute must not be negative' if minute.negative? matcher['minute'] = minute end if hour raise ArgumentError, 'hour must not be negative' if hour.negative? matcher['hour'] = hour end if wday raise ArgumentError, 'wday must not be negative' if wday.negative? matcher['wday'] = wday end subscribe(key, matcher) do |event| if (minute_interval.nil? || (event['minute'] % minute_interval).zero?) && (hour_interval.nil? || (event['hour'] % hour_interval).zero?) # Yield the block passed in. block.call(event) end end end |
#size ⇒ Object
15 16 17 |
# File 'lib/queue_bus/dispatch.rb', line 15 def size @subscriptions.size end |
#subscribe(key, matcher_hash = nil, &block) ⇒ Object
58 59 60 |
# File 'lib/queue_bus/dispatch.rb', line 58 def subscribe(key, matcher_hash = nil, &block) dispatch_event('default', key, matcher_hash, block) end |
#subscription_matches(attributes) ⇒ Object
82 83 84 85 86 87 88 |
# File 'lib/queue_bus/dispatch.rb', line 82 def subscription_matches(attributes) out = subscriptions.matches(attributes) out.each do |sub| sub.app_key = app_key end out end |