Class: Emque::Consuming::Router

Inherits:
Object
  • Object
show all
Defined in:
lib/emque/consuming/router.rb

Defined Under Namespace

Classes: Mapping

Constant Summary collapse

ConfigurationError =
Class.new(StandardError)

Instance Method Summary collapse

Constructor Details

#initializeRouter

Returns a new instance of Router.



6
7
8
# File 'lib/emque/consuming/router.rb', line 6

def initialize
  self.mappings = {}
end

Instance Method Details

#map(&block) ⇒ Object



10
11
12
# File 'lib/emque/consuming/router.rb', line 10

def map(&block)
  self.instance_eval(&block)
end

#route(topic, type, message) ⇒ Object



20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# File 'lib/emque/consuming/router.rb', line 20

def route(topic, type, message)
  mappings[topic.to_sym].each do |mapping|
    method = mapping.route(type.to_s)

    if method
      consumer = mapping.consumer

      if mapping.middleware?
        message = message.with(
          :values =>
            Oj.load(
              mapping
                .middleware
                .inject(message.original) { |compiled, callable|
                  callable.call(compiled)
                },
              :symbol_keys => true
            )
        )
      end

      consumer.new.consume(method, message)
    end
  end
end

#topic(mapping, &block) ⇒ Object



14
15
16
17
18
# File 'lib/emque/consuming/router.rb', line 14

def topic(mapping, &block)
  mapping = Mapping.new(mapping, &block)
  mappings[mapping.topic.to_sym] ||= []
  mappings[mapping.topic.to_sym] << mapping
end

#topic_mappingObject



46
47
48
49
50
51
52
# File 'lib/emque/consuming/router.rb', line 46

def topic_mapping
  mappings.inject({}) do |hash, (topic, maps)|
    hash.tap do |h|
      h[topic] = maps.map(&:consumer)
    end
  end
end

#workers(topic) ⇒ Object



54
55
56
# File 'lib/emque/consuming/router.rb', line 54

def workers(topic)
  mappings[topic.to_sym].map(&:workers).max
end