Class: BBK::App::Dispatcher
- Inherits:
-
Object
- Object
- BBK::App::Dispatcher
- Defined in:
- lib/bbk/app/dispatcher.rb,
lib/bbk/app/dispatcher/route.rb,
lib/bbk/app/dispatcher/result.rb,
lib/bbk/app/dispatcher/message.rb,
lib/bbk/app/dispatcher/message_stream.rb,
lib/bbk/app/dispatcher/queue_stream_strategy.rb
Defined Under Namespace
Classes: Message, MessageStream, QueueStreamStrategy, Result, Route
Constant Summary collapse
- ANSWER_DOMAIN =
'answer'
Instance Attribute Summary collapse
-
#consumers ⇒ Object
readonly
Returns the value of attribute consumers.
-
#force_quit ⇒ Object
Returns the value of attribute force_quit.
-
#logger ⇒ Object
readonly
Returns the value of attribute logger.
-
#middlewares ⇒ Object
readonly
Returns the value of attribute middlewares.
-
#observer ⇒ Object
readonly
Returns the value of attribute observer.
-
#publishers ⇒ Object
readonly
Returns the value of attribute publishers.
Instance Method Summary collapse
-
#close(_timeout = 5) ⇒ Object
stop dispatcher and wait for termination Чтоб остановить диспетчер надо: 1.
-
#initialize(observer, pool_size: 3, logger: BBK::App.logger, pool_factory: SimplePoolFactory, stream_strategy: QueueStreamStrategy) ⇒ Dispatcher
constructor
A new instance of Dispatcher.
- #register_consumer(consumer) ⇒ Object
- #register_middleware(middleware) ⇒ Object
- #register_publisher(publisher) ⇒ Object
-
#run ⇒ Object
Run all consumers and blocks on message processing.
Constructor Details
#initialize(observer, pool_size: 3, logger: BBK::App.logger, pool_factory: SimplePoolFactory, stream_strategy: QueueStreamStrategy) ⇒ Dispatcher
Returns a new instance of Dispatcher.
37 38 39 40 41 42 43 44 45 46 47 48 |
# File 'lib/bbk/app/dispatcher.rb', line 37 def initialize(observer, pool_size: 3, logger: BBK::App.logger, pool_factory: SimplePoolFactory, stream_strategy: QueueStreamStrategy) @observer = observer @pool_size = pool_size logger = logger.respond_to?(:tagged) ? logger : ActiveSupport::TaggedLogging.new(logger) @logger = BBK::Utils::ProxyLogger.new(logger, tags: 'Dispatcher') @consumers = [] @publishers = [] @middlewares = [] @pool_factory = pool_factory @stream_strategy_class = stream_strategy @force_quit = false end |
Instance Attribute Details
#consumers ⇒ Object (readonly)
Returns the value of attribute consumers.
33 34 35 |
# File 'lib/bbk/app/dispatcher.rb', line 33 def consumers @consumers end |
#force_quit ⇒ Object
Returns the value of attribute force_quit.
32 33 34 |
# File 'lib/bbk/app/dispatcher.rb', line 32 def force_quit @force_quit end |
#logger ⇒ Object (readonly)
Returns the value of attribute logger.
33 34 35 |
# File 'lib/bbk/app/dispatcher.rb', line 33 def logger @logger end |
#middlewares ⇒ Object (readonly)
Returns the value of attribute middlewares.
33 34 35 |
# File 'lib/bbk/app/dispatcher.rb', line 33 def middlewares @middlewares end |
#observer ⇒ Object (readonly)
Returns the value of attribute observer.
33 34 35 |
# File 'lib/bbk/app/dispatcher.rb', line 33 def observer @observer end |
#publishers ⇒ Object (readonly)
Returns the value of attribute publishers.
33 34 35 |
# File 'lib/bbk/app/dispatcher.rb', line 33 def publishers @publishers end |
Instance Method Details
#close(_timeout = 5) ⇒ Object
stop dispatcher and wait for termination Чтоб остановить диспетчер надо:
-
остановить консьюмеры
-
остановить прием новых сообщений - @stream.close
-
дождаться обработки всего в очереди или таймаут
-
остановить потоки
-
остановить паблишеры
87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 |
# File 'lib/bbk/app/dispatcher.rb', line 87 def close(_timeout = 5) ActiveSupport::Notifications.instrument 'dispatcher.close', dispatcher: self consumers.each do |cons| begin cons.stop rescue StandardError => e logger.error "Consumer #{cons} stop error: #{e}" logger.debug e.backtrace end end @stream_strategy.stop(5) consumers.each do |cons| begin cons.close rescue StandardError => e logger.error "Consumer #{cons} close error: #{e}" logger.debug e.backtrace end end publishers.each do |pub| begin pub.close rescue StandardError => e logger.error "Publisher #{pub} close error: #{e}" logger.debug e.backtrace end end end |
#register_consumer(consumer) ⇒ Object
50 51 52 |
# File 'lib/bbk/app/dispatcher.rb', line 50 def register_consumer(consumer) consumers << consumer end |
#register_middleware(middleware) ⇒ Object
58 59 60 |
# File 'lib/bbk/app/dispatcher.rb', line 58 def register_middleware(middleware) middlewares << middleware end |
#register_publisher(publisher) ⇒ Object
54 55 56 |
# File 'lib/bbk/app/dispatcher.rb', line 54 def register_publisher(publisher) publishers << publisher end |
#run ⇒ Object
Run all consumers and blocks on message processing
63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 |
# File 'lib/bbk/app/dispatcher.rb', line 63 def run @pool = @pool_factory.call(@pool_size, 10) @stream_strategy = @stream_strategy_class.new(@pool, logger: logger) ActiveSupport::Notifications.instrument 'dispatcher.run', dispatcher: self @stream_strategy.run(consumers) do |msg| begin logger.tagged(msg.headers[:message_id]) do process msg end rescue StandardError => e logger.fatal "E[#{@stream_strategy_class}]: #{e}" logger.fatal "E[#{@stream_strategy_class}]: #{e.backtrace.join("\n")}" end end end |