Class: Emque::Consuming::Router
- Inherits:
-
Object
- Object
- Emque::Consuming::Router
- Defined in:
- lib/emque/consuming/router.rb
Defined Under Namespace
Classes: Mapping
Constant Summary collapse
- ConfigurationError =
Class.new(StandardError)
Instance Method Summary collapse
-
#initialize ⇒ Router
constructor
A new instance of Router.
- #map(&block) ⇒ Object
- #route(topic, type, message) ⇒ Object
- #topic(mapping, &block) ⇒ Object
- #topic_mapping ⇒ Object
- #workers(topic) ⇒ Object
Constructor Details
#initialize ⇒ Router
Returns a new instance of Router.
6 7 8 |
# File 'lib/emque/consuming/router.rb', line 6 def initialize self.mappings = {} end |
Instance Method Details
#map(&block) ⇒ Object
10 11 12 |
# File 'lib/emque/consuming/router.rb', line 10 def map(&block) self.instance_eval(&block) end |
#route(topic, type, message) ⇒ Object
20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 |
# File 'lib/emque/consuming/router.rb', line 20 def route(topic, type, ) mappings[topic.to_sym].each do |mapping| method = mapping.route(type.to_s) if method consumer = mapping.consumer if mapping.middleware? = .with( :values => Oj.load( mapping .middleware .inject(.original) { |compiled, callable| callable.call(compiled) }, :symbol_keys => true ) ) end consumer.new.consume(method, ) end end end |
#topic(mapping, &block) ⇒ Object
14 15 16 17 18 |
# File 'lib/emque/consuming/router.rb', line 14 def topic(mapping, &block) mapping = Mapping.new(mapping, &block) mappings[mapping.topic.to_sym] ||= [] mappings[mapping.topic.to_sym] << mapping end |
#topic_mapping ⇒ Object
46 47 48 49 50 51 52 |
# File 'lib/emque/consuming/router.rb', line 46 def topic_mapping mappings.inject({}) do |hash, (topic, maps)| hash.tap do |h| h[topic] = maps.map(&:consumer) end end end |
#workers(topic) ⇒ Object
54 55 56 |
# File 'lib/emque/consuming/router.rb', line 54 def workers(topic) mappings[topic.to_sym].map(&:workers).max end |