Class: BBK::App::Dispatcher
- Inherits:
-
Object
- Object
- BBK::App::Dispatcher
- Defined in:
- lib/bbk/app/dispatcher.rb,
lib/bbk/app/dispatcher/route.rb,
lib/bbk/app/dispatcher/result.rb,
lib/bbk/app/dispatcher/message.rb,
lib/bbk/app/dispatcher/message_stream.rb,
lib/bbk/app/dispatcher/queue_stream_strategy.rb
Defined Under Namespace
Classes: Message, MessageStream, QueueStreamStrategy, Result, Route
Constant Summary collapse
- ANSWER_DOMAIN =
'answer'.freeze
- DEFAULT_PROTOCOL =
'default'.freeze
Instance Attribute Summary collapse
-
#consumers ⇒ Object
readonly
Returns the value of attribute consumers.
-
#force_quit ⇒ Object
Returns the value of attribute force_quit.
-
#logger ⇒ Object
readonly
Returns the value of attribute logger.
-
#middlewares ⇒ Object
readonly
Returns the value of attribute middlewares.
-
#observer ⇒ Object
readonly
Returns the value of attribute observer.
-
#publishers ⇒ Object
readonly
Returns the value of attribute publishers.
Instance Method Summary collapse
-
#close(_timeout = 5) ⇒ Object
stop dispatcher and wait for termination Чтоб остановить диспетчер надо: 1.
-
#default_publisher ⇒ Object
get default publisher.
-
#default_publisher=(publisher) ⇒ Object
set default publisher for results with empty scheme or DEFAULT_PROTOCOL scheme.
- #execute_message(message) ⇒ Object
-
#initialize(observer, pool_size: 3, logger: BBK::App.logger, pool_factory: SimplePoolFactory, stream_strategy: QueueStreamStrategy) ⇒ Dispatcher
constructor
A new instance of Dispatcher.
-
#process(message) ⇒ Object
process one message and sending existed results messages.
- #register_consumer(consumer) ⇒ Object
- #register_middleware(middleware) ⇒ Object
- #register_publisher(publisher) ⇒ Object
-
#run ⇒ Object
Run all consumers and blocks on message processing.
Constructor Details
#initialize(observer, pool_size: 3, logger: BBK::App.logger, pool_factory: SimplePoolFactory, stream_strategy: QueueStreamStrategy) ⇒ Dispatcher
Returns a new instance of Dispatcher.
40 41 42 43 44 45 46 47 48 49 50 51 |
# File 'lib/bbk/app/dispatcher.rb', line 40 def initialize(observer, pool_size: 3, logger: BBK::App.logger, pool_factory: SimplePoolFactory, stream_strategy: QueueStreamStrategy) @observer = observer @pool_size = pool_size logger = logger.respond_to?(:tagged) ? logger : ActiveSupport::TaggedLogging.new(logger) @logger = BBK::Utils::ProxyLogger.new(logger, tags: 'Dispatcher') @consumers = [] @publishers = [] @middlewares = [] @pool_factory = pool_factory @stream_strategy_class = stream_strategy @force_quit = false end |
Instance Attribute Details
#consumers ⇒ Object (readonly)
Returns the value of attribute consumers.
35 36 37 |
# File 'lib/bbk/app/dispatcher.rb', line 35 def consumers @consumers end |
#force_quit ⇒ Object
Returns the value of attribute force_quit.
34 35 36 |
# File 'lib/bbk/app/dispatcher.rb', line 34 def force_quit @force_quit end |
#logger ⇒ Object (readonly)
Returns the value of attribute logger.
35 36 37 |
# File 'lib/bbk/app/dispatcher.rb', line 35 def logger @logger end |
#middlewares ⇒ Object (readonly)
Returns the value of attribute middlewares.
35 36 37 |
# File 'lib/bbk/app/dispatcher.rb', line 35 def middlewares @middlewares end |
#observer ⇒ Object (readonly)
Returns the value of attribute observer.
35 36 37 |
# File 'lib/bbk/app/dispatcher.rb', line 35 def observer @observer end |
#publishers ⇒ Object (readonly)
Returns the value of attribute publishers.
35 36 37 |
# File 'lib/bbk/app/dispatcher.rb', line 35 def publishers @publishers end |
Instance Method Details
#close(_timeout = 5) ⇒ Object
stop dispatcher and wait for termination Чтоб остановить диспетчер надо:
-
остановить консьюмеры
-
остановить прием новых сообщений - @stream.close
-
дождаться обработки всего в очереди или таймаут
-
остановить потоки
-
остановить паблишеры
101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 |
# File 'lib/bbk/app/dispatcher.rb', line 101 def close(_timeout = 5) ActiveSupport::Notifications.instrument 'dispatcher.close', dispatcher: self consumers.each do |cons| begin cons.stop rescue StandardError => e logger.error "Consumer #{cons} stop error: #{e}" logger.debug e.backtrace end end @stream_strategy.stop(5) consumers.each do |cons| begin cons.close rescue StandardError => e logger.error "Consumer #{cons} close error: #{e}" logger.debug e.backtrace end end publishers.each do |pub| begin pub.close rescue StandardError => e logger.error "Publisher #{pub} close error: #{e}" logger.debug e.backtrace end end end |
#default_publisher ⇒ Object
get default publisher
68 69 70 |
# File 'lib/bbk/app/dispatcher.rb', line 68 def default_publisher @default_publisher || (publishers.size == 1 ? publishers.first : nil) end |
#default_publisher=(publisher) ⇒ Object
set default publisher for results with empty scheme or DEFAULT_PROTOCOL scheme
63 64 65 |
# File 'lib/bbk/app/dispatcher.rb', line 63 def default_publisher=(publisher) @default_publisher = publisher end |
#execute_message(message) ⇒ Object
133 134 135 136 137 |
# File 'lib/bbk/app/dispatcher.rb', line 133 def () build_processing_stack.call().select do |r| r.is_a?(BBK::App::Dispatcher::Result) end end |
#process(message) ⇒ Object
process one message and sending existed results messages
140 141 142 143 144 145 146 147 148 149 |
# File 'lib/bbk/app/dispatcher.rb', line 140 def process() results = () logger.debug "There are #{results.count} results to send from #{message.headers[:message_id]}..." send_results(, results).value rescue StandardError => e logger.error "Failed processing message: #{e.inspect}" ActiveSupport::Notifications.instrument 'dispatcher.exception', msg: , exception: e .nack(error: e) close if force_quit end |
#register_consumer(consumer) ⇒ Object
53 54 55 |
# File 'lib/bbk/app/dispatcher.rb', line 53 def register_consumer(consumer) consumers << consumer end |
#register_middleware(middleware) ⇒ Object
72 73 74 |
# File 'lib/bbk/app/dispatcher.rb', line 72 def register_middleware(middleware) middlewares << middleware end |
#register_publisher(publisher) ⇒ Object
57 58 59 60 |
# File 'lib/bbk/app/dispatcher.rb', line 57 def register_publisher(publisher) raise "Publisher support #{DEFAULT_PROTOCOL}" if publisher.protocols.include?(DEFAULT_PROTOCOL) publishers << publisher end |
#run ⇒ Object
Run all consumers and blocks on message processing
77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 |
# File 'lib/bbk/app/dispatcher.rb', line 77 def run @pool = @pool_factory.call(@pool_size, 10) @stream_strategy = @stream_strategy_class.new(@pool, logger: logger) ActiveSupport::Notifications.instrument 'dispatcher.run', dispatcher: self @stream_strategy.run(consumers) do |msg| begin logger.tagged(msg.headers[:message_id]) do process msg end rescue StandardError => e logger.fatal "E[#{@stream_strategy_class}]: #{e}" logger.fatal "E[#{@stream_strategy_class}]: #{e.backtrace.join("\n")}" end end end |