Module: ExcADG::Broker

Defined in:
lib/excadg/broker.rb

Overview

handle requests sending/receiving though Ractor’s interface

Defined Under Namespace

Classes: CantSendRequest, RequestProcessingFailed, UnknownRequestType

Class Attribute Summary collapse

Class Method Summary collapse

Class Attribute Details

.data_storeObject (readonly)

Returns the value of attribute data_store.



14
15
16
# File 'lib/excadg/broker.rb', line 14

def data_store
  @data_store
end

Class Method Details

.ask(request) ⇒ Object

is used from vertices to send reaqests to the broker

Returns:

  • data received in response from the main ractor

Raises:

  • StandardError if response is a StandardError

  • CantSendRequest if sending request failed for any reason



20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# File 'lib/excadg/broker.rb', line 20

def ask request
  raise UnknownRequestType, request unless request.is_a? Request

  begin
    Ractor.main.send request
  rescue StandardError => e
    raise CantSendRequest, cause: e
  end

  Log.info 'getting response'
  resp = Ractor.receive
  Log.debug "got response #{resp}"
  raise resp if resp.is_a? StandardError

  Log.debug 'returning response'
  resp
end

.runObject

start requests broker for vertices in a separated thread

Returns:

  • the thread started



40
41
42
43
44
45
46
47
48
49
50
51
# File 'lib/excadg/broker.rb', line 40

def run
  @data_store ||= DataStore.new
  @broker = Thread.new { loop { process_request } } unless @broker&.alive?

  at_exit {
    Log.info 'shut down broker'
    @broker.kill
  }

  Log.info 'broker is started'
  @broker
end

.wait_all(timeout: 60, period: 1) ⇒ Object

makes a thread to wait for all known vertices to reach a final state

Parameters:

  • timeout (defaults to: 60)

    total waiting timeout in seconds, nil means wait forever

  • period (defaults to: 1)

    time between vertices state check



56
57
58
59
60
61
62
63
64
65
66
67
68
69
# File 'lib/excadg/broker.rb', line 56

def wait_all timeout: 60, period: 1
  Thread.new {
    Log.info "timeout is #{timeout} seconds"
    Timeout.timeout(timeout) {
      loop {
        sleep period
        states = @data_store.to_a.group_by(&:state).keys
        Log.info "vertices in #{states} exist"
        # that's the only final states for vertices
        break if (states - i[done failed]).empty?
      }
    }
  }
end