Class: ExcADG::Vertex

Inherits:
Ractor
  • Object
show all
Includes:
RTimeout
Defined in:
lib/excadg/vertex.rb

Overview

Individual vertex of the execution graph to run in a separated Ractor

Class Method Summary collapse

Instance Method Summary collapse

Methods included from RTimeout

await

Class Method Details

.new(payload:, name: nil, deps: [], timeout: nil) ⇒ Object

make a vertex, it runs automagically

Parameters:

  • payload

    Payload object to run in this Vertex

  • name (defaults to: nil)

    optional vertex name to identify vertex

  • deps (defaults to: [])

    list of other Vertices or names to wait for

  • timeout (ExcADG::VTimeouts) (defaults to: nil)

    or total time in seconds for the payload to run

Raises:

  • Payload::IncorrectPayloadArity in case payload returns function with arity > 1

  • Payload::NoPayloadSet in case payload provided has incorrect type



65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
# File 'lib/excadg/vertex.rb', line 65

def new payload:, name: nil, deps: [], timeout: nil
  raise Payload::NoPayloadSet, "expected payload, got #{payload.class}" unless payload.is_a? Payload

  raise Payload::IncorrectPayloadArity, "arity is #{payload.get.arity}, supported only 0 and 1" unless [0, 1].include? payload.get.arity

  dm = DependencyManager.new(deps: deps)
  vtimeout = timeout.is_a?(VTimeout) ? timeout : VTimeout.new(payload: timeout)

  super(payload, name, vtimeout, dm) { |payload, name, vtimeout, deps_manager|
    state_machine = StateMachine.new(name: name || "v#{number}".to_sym)
    state_machine.with_fault_processing {
      await(timeout: vtimeout.global) {
        Broker.ask Request::Update.new data: state_machine.state_data
        Log.info 'building vertex lifecycle'
        state_machine.bind_action(:new, :ready) {
          await(timeout: vtimeout.deps) {
            until deps_manager.deps.empty?
              deps_data = Broker.ask Request::GetStateData.new(deps: deps_manager.deps)
              deps_manager.deduct_deps deps_data
              sleep 0.2
            end
            deps_manager.data
          }
        }
        state_machine.bind_action(:ready, :done) {
          function = payload.get
          await(timeout: vtimeout.payload) {
            case function.arity
            when 0 then function.call
            when 1 then function.call state_machine.state_data.data
            else
              raise Payload::IncorrectPayloadArity, "unexpected payload arity: #{function.arity}, supported only 0 and 1"
            end
          }
        }

        Log.debug "another step fades: #{state_machine.state_data}" while state_machine.step

        Log.debug 'shut down'
      }
    }
  }
end

Instance Method Details

#dataObject

obtains current Vertex-es data by lookup in the Broker’s data, available from the main Ractor only



41
42
43
# File 'lib/excadg/vertex.rb', line 41

def data
  Broker.data_store[self]
end

#infoObject

Returns parsed info about the Ractor: number, file, line in file, status.

Returns:

  • parsed info about the Ractor: number, file, line in file, status



18
19
20
# File 'lib/excadg/vertex.rb', line 18

def info
  inspect.scan(/^#<Ractor:#(\d+)\s(.*):(\d+)\s(\w+)>$/).first
end

#nameObject

gets current Vertex’s name, available from the main Ractor only



53
54
55
# File 'lib/excadg/vertex.rb', line 53

def name
  data&.name
end

#numberObject

Returns Ractor’s number, -1 if parsing failed.

Returns:

  • Ractor’s number, -1 if parsing failed



29
30
31
# File 'lib/excadg/vertex.rb', line 29

def number
  info&.dig(0).to_i || -1
end

#stateObject

gets current Vertex’s state, available from the main Ractor only



47
48
49
# File 'lib/excadg/vertex.rb', line 47

def state
  data&.state
end

#statusObject

Ractor’s internal status

Returns:

  • Symbol, :unknown if parsing failed



24
25
26
# File 'lib/excadg/vertex.rb', line 24

def status
  (info&.dig(3) || :unknwon).to_sym
end

#to_sObject



33
34
35
# File 'lib/excadg/vertex.rb', line 33

def to_s
  "#{number} #{status}"
end