Module: ExcADG::Broker
- Defined in:
- lib/excadg/broker.rb
Overview
handle requests sending/receiving though Ractor’s interface
Defined Under Namespace
Classes: CantSendRequest, RequestProcessingFailed, UnknownRequestType
Class Attribute Summary collapse
-
.data_store ⇒ Object
readonly
Returns the value of attribute data_store.
Class Method Summary collapse
-
.ask(request) ⇒ Object
is used from vertices to send reaqests to the broker.
-
.run ⇒ Object
start requests broker for vertices in a separated thread.
-
.wait_all(timeout: 60, period: 1) ⇒ Object
makes a thread to wait for all known vertices to reach a final state.
Class Attribute Details
.data_store ⇒ Object (readonly)
Returns the value of attribute data_store.
14 15 16 |
# File 'lib/excadg/broker.rb', line 14 def data_store @data_store end |
Class Method Details
.ask(request) ⇒ Object
is used from vertices to send reaqests to the broker
20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
# File 'lib/excadg/broker.rb', line 20 def ask request raise UnknownRequestType, request unless request.is_a? Request begin Ractor.main.send request rescue StandardError => e raise CantSendRequest, cause: e end Log.info 'getting response' resp = Ractor.receive Log.debug "got response #{resp}" raise resp if resp.is_a? StandardError Log.debug 'returning response' resp end |
.run ⇒ Object
start requests broker for vertices in a separated thread
40 41 42 43 44 45 46 47 48 49 50 51 |
# File 'lib/excadg/broker.rb', line 40 def run @data_store ||= DataStore.new @broker = Thread.new { loop { process_request } } unless @broker&.alive? at_exit { Log.info 'shut down broker' @broker.kill } Log.info 'broker is started' @broker end |
.wait_all(timeout: 60, period: 1) ⇒ Object
makes a thread to wait for all known vertices to reach a final state
56 57 58 59 60 61 62 63 64 65 66 67 68 69 |
# File 'lib/excadg/broker.rb', line 56 def wait_all timeout: 60, period: 1 Thread.new { Log.info "timeout is #{timeout} seconds" Timeout.timeout(timeout) { loop { sleep period states = @data_store.to_a.group_by(&:state).keys Log.info "vertices in #{states} exist" # that's the only final states for vertices break if (states - i[done failed]).empty? } } } end |