Class: BBK::App::Dispatcher

Inherits:
Object
  • Object
show all
Defined in:
lib/bbk/app/dispatcher.rb,
lib/bbk/app/dispatcher/route.rb,
lib/bbk/app/dispatcher/result.rb,
lib/bbk/app/dispatcher/message.rb,
lib/bbk/app/dispatcher/message_stream.rb,
lib/bbk/app/dispatcher/queue_stream_strategy.rb

Defined Under Namespace

Classes: Message, MessageStream, QueueStreamStrategy, Result, Route

Constant Summary collapse

ANSWER_DOMAIN =
'answer'

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(observer, pool_size: 3, logger: BBK::App.logger, pool_factory: SimplePoolFactory, stream_strategy: QueueStreamStrategy) ⇒ Dispatcher

Returns a new instance of Dispatcher.



37
38
39
40
41
42
43
44
45
46
47
48
# File 'lib/bbk/app/dispatcher.rb', line 37

def initialize(observer, pool_size: 3, logger: BBK::App.logger, pool_factory: SimplePoolFactory, stream_strategy: QueueStreamStrategy)
  @observer = observer
  @pool_size = pool_size
  logger = logger.respond_to?(:tagged) ? logger : ActiveSupport::TaggedLogging.new(logger)
  @logger = BBK::Utils::ProxyLogger.new(logger, tags: 'Dispatcher')
  @consumers = []
  @publishers = []
  @middlewares = []
  @pool_factory = pool_factory
  @stream_strategy_class = stream_strategy
  @force_quit = false
end

Instance Attribute Details

#consumersObject (readonly)

Returns the value of attribute consumers.



33
34
35
# File 'lib/bbk/app/dispatcher.rb', line 33

def consumers
  @consumers
end

#force_quitObject

Returns the value of attribute force_quit.



32
33
34
# File 'lib/bbk/app/dispatcher.rb', line 32

def force_quit
  @force_quit
end

#loggerObject (readonly)

Returns the value of attribute logger.



33
34
35
# File 'lib/bbk/app/dispatcher.rb', line 33

def logger
  @logger
end

#middlewaresObject (readonly)

Returns the value of attribute middlewares.



33
34
35
# File 'lib/bbk/app/dispatcher.rb', line 33

def middlewares
  @middlewares
end

#observerObject (readonly)

Returns the value of attribute observer.



33
34
35
# File 'lib/bbk/app/dispatcher.rb', line 33

def observer
  @observer
end

#publishersObject (readonly)

Returns the value of attribute publishers.



33
34
35
# File 'lib/bbk/app/dispatcher.rb', line 33

def publishers
  @publishers
end

Instance Method Details

#close(_timeout = 5) ⇒ Object

stop dispatcher and wait for termination Чтоб остановить диспетчер надо:

  1. остановить консьюмеры

  2. остановить прием новых сообщений - @stream.close

  3. дождаться обработки всего в очереди или таймаут

  4. остановить потоки

  5. остановить паблишеры



87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
# File 'lib/bbk/app/dispatcher.rb', line 87

def close(_timeout = 5)
  ActiveSupport::Notifications.instrument 'dispatcher.close', dispatcher: self
  consumers.each do |cons|
    begin
      cons.stop
    rescue StandardError => e
      logger.error "Consumer #{cons} stop error: #{e}"
      logger.debug e.backtrace
    end
  end

  @stream_strategy.stop(5)

  consumers.each do |cons|
    begin
      cons.close
    rescue StandardError => e
      logger.error "Consumer #{cons} close error: #{e}"
      logger.debug e.backtrace
    end
  end

  publishers.each do |pub|
    begin
      pub.close
    rescue StandardError => e
      logger.error "Publisher #{pub} close error: #{e}"
      logger.debug e.backtrace
    end
  end
end

#register_consumer(consumer) ⇒ Object



50
51
52
# File 'lib/bbk/app/dispatcher.rb', line 50

def register_consumer(consumer)
  consumers << consumer
end

#register_middleware(middleware) ⇒ Object



58
59
60
# File 'lib/bbk/app/dispatcher.rb', line 58

def register_middleware(middleware)
  middlewares << middleware
end

#register_publisher(publisher) ⇒ Object



54
55
56
# File 'lib/bbk/app/dispatcher.rb', line 54

def register_publisher(publisher)
  publishers << publisher
end

#runObject

Run all consumers and blocks on message processing



63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
# File 'lib/bbk/app/dispatcher.rb', line 63

def run
  @pool = @pool_factory.call(@pool_size, 10)
  @stream_strategy = @stream_strategy_class.new(@pool, logger: logger)
  ActiveSupport::Notifications.instrument 'dispatcher.run', dispatcher: self

  @stream_strategy.run(consumers) do |msg|
    begin
      logger.tagged(msg.headers[:message_id]) do
        process msg
      end
    rescue StandardError => e
      logger.fatal "E[#{@stream_strategy_class}]: #{e}"
      logger.fatal "E[#{@stream_strategy_class}]: #{e.backtrace.join("\n")}"
    end
  end
end