Class: BBK::App::Dispatcher

Inherits:
Object
  • Object
show all
Defined in:
lib/bbk/app/dispatcher.rb,
lib/bbk/app/dispatcher/route.rb,
lib/bbk/app/dispatcher/result.rb,
lib/bbk/app/dispatcher/message.rb,
lib/bbk/app/dispatcher/message_stream.rb,
lib/bbk/app/dispatcher/queue_stream_strategy.rb

Defined Under Namespace

Classes: Message, MessageStream, QueueStreamStrategy, Result, Route

Constant Summary collapse

ANSWER_DOMAIN =
'answer'.freeze
DEFAULT_PROTOCOL =
'default'.freeze

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(observer, pool_size: 3, logger: BBK::App.logger, pool_factory: SimplePoolFactory, stream_strategy: QueueStreamStrategy) ⇒ Dispatcher

Returns a new instance of Dispatcher.



40
41
42
43
44
45
46
47
48
49
50
51
# File 'lib/bbk/app/dispatcher.rb', line 40

def initialize(observer, pool_size: 3, logger: BBK::App.logger, pool_factory: SimplePoolFactory, stream_strategy: QueueStreamStrategy)
  @observer = observer
  @pool_size = pool_size
  logger = logger.respond_to?(:tagged) ? logger : ActiveSupport::TaggedLogging.new(logger)
  @logger = BBK::Utils::ProxyLogger.new(logger, tags: 'Dispatcher')
  @consumers = []
  @publishers = []
  @middlewares = []
  @pool_factory = pool_factory
  @stream_strategy_class = stream_strategy
  @force_quit = false
end

Instance Attribute Details

#consumersObject (readonly)

Returns the value of attribute consumers.



35
36
37
# File 'lib/bbk/app/dispatcher.rb', line 35

def consumers
  @consumers
end

#force_quitObject

Returns the value of attribute force_quit.



34
35
36
# File 'lib/bbk/app/dispatcher.rb', line 34

def force_quit
  @force_quit
end

#loggerObject (readonly)

Returns the value of attribute logger.



35
36
37
# File 'lib/bbk/app/dispatcher.rb', line 35

def logger
  @logger
end

#middlewaresObject (readonly)

Returns the value of attribute middlewares.



35
36
37
# File 'lib/bbk/app/dispatcher.rb', line 35

def middlewares
  @middlewares
end

#observerObject (readonly)

Returns the value of attribute observer.



35
36
37
# File 'lib/bbk/app/dispatcher.rb', line 35

def observer
  @observer
end

#publishersObject (readonly)

Returns the value of attribute publishers.



35
36
37
# File 'lib/bbk/app/dispatcher.rb', line 35

def publishers
  @publishers
end

Instance Method Details

#close(_timeout = 5) ⇒ Object

stop dispatcher and wait for termination Чтоб остановить диспетчер надо:

  1. остановить консьюмеры

  2. остановить прием новых сообщений - @stream.close

  3. дождаться обработки всего в очереди или таймаут

  4. остановить потоки

  5. остановить паблишеры



101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
# File 'lib/bbk/app/dispatcher.rb', line 101

def close(_timeout = 5)
  ActiveSupport::Notifications.instrument 'dispatcher.close', dispatcher: self
  consumers.each do |cons|
    begin
      cons.stop
    rescue StandardError => e
      logger.error "Consumer #{cons} stop error: #{e}"
      logger.debug e.backtrace
    end
  end

  @stream_strategy.stop(5)

  consumers.each do |cons|
    begin
      cons.close
    rescue StandardError => e
      logger.error "Consumer #{cons} close error: #{e}"
      logger.debug e.backtrace
    end
  end

  publishers.each do |pub|
    begin
      pub.close
    rescue StandardError => e
      logger.error "Publisher #{pub} close error: #{e}"
      logger.debug e.backtrace
    end
  end
end

#default_publisherObject

get default publisher



68
69
70
# File 'lib/bbk/app/dispatcher.rb', line 68

def default_publisher
  @default_publisher || (publishers.size == 1 ? publishers.first : nil)
end

#default_publisher=(publisher) ⇒ Object

set default publisher for results with empty scheme or DEFAULT_PROTOCOL scheme



63
64
65
# File 'lib/bbk/app/dispatcher.rb', line 63

def default_publisher=(publisher)
  @default_publisher = publisher
end

#execute_message(message) ⇒ Object



133
134
135
136
137
# File 'lib/bbk/app/dispatcher.rb', line 133

def execute_message(message)
  build_processing_stack.call(message).select do |r|
    r.is_a?(BBK::App::Dispatcher::Result)
  end
end

#process(message) ⇒ Object

process one message and sending existed results messages



140
141
142
143
144
145
146
147
148
149
# File 'lib/bbk/app/dispatcher.rb', line 140

def process(message)
  results = execute_message(message)
  logger.debug "There are #{results.count} results to send from #{message.headers[:message_id]}..."
  send_results(message, results).value
rescue StandardError => e
  logger.error "Failed processing message: #{e.inspect}"
  ActiveSupport::Notifications.instrument 'dispatcher.exception', msg: message, exception: e
  message.nack(error: e)
  close if force_quit
end

#register_consumer(consumer) ⇒ Object



53
54
55
# File 'lib/bbk/app/dispatcher.rb', line 53

def register_consumer(consumer)
  consumers << consumer
end

#register_middleware(middleware) ⇒ Object



72
73
74
# File 'lib/bbk/app/dispatcher.rb', line 72

def register_middleware(middleware)
  middlewares << middleware
end

#register_publisher(publisher) ⇒ Object



57
58
59
60
# File 'lib/bbk/app/dispatcher.rb', line 57

def register_publisher(publisher)
  raise "Publisher support #{DEFAULT_PROTOCOL}" if publisher.protocols.include?(DEFAULT_PROTOCOL)
  publishers << publisher
end

#runObject

Run all consumers and blocks on message processing



77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
# File 'lib/bbk/app/dispatcher.rb', line 77

def run
  @pool = @pool_factory.call(@pool_size, 10)
  @stream_strategy = @stream_strategy_class.new(@pool, logger: logger)
  ActiveSupport::Notifications.instrument 'dispatcher.run', dispatcher: self

  @stream_strategy.run(consumers) do |msg|
    begin
      logger.tagged(msg.headers[:message_id]) do
        process msg
      end
    rescue StandardError => e
      logger.fatal "E[#{@stream_strategy_class}]: #{e}"
      logger.fatal "E[#{@stream_strategy_class}]: #{e.backtrace.join("\n")}"
    end
  end
end