Class: OFlow::Env

Inherits:
Object
  • Object
show all
Includes:
HasErrorHandler, HasLog
Defined in:
lib/oflow/env.rb

Overview

The platform that Flows are created in. It is the outer most element of the OFlow system.

Constant Summary collapse

@@log_level =
Logger::WARN

Class Method Summary collapse

Instance Method Summary collapse

Methods included from HasErrorHandler

#error_handler=, #handle_error

Methods included from HasLog

#debug, #debug?, #error, #error?, #fatal, #info, #info?, #log=, #log_msg, #warn, #warn?

Constructor Details

#initialize(name = '') ⇒ Env

Returns a new instance of Env.



26
27
28
29
30
31
32
33
# File 'lib/oflow/env.rb', line 26

def initialize(name='')
  # The default logging level.
  @flows = {}
  @prepared = false
  @name = name
  @log = nil
  _clear()
end

Class Method Details

.log_levelFixnum

Returns the default log level.

Returns:

  • (Fixnum)

    the default log level which is one of the Logger::Severity values.



15
16
17
# File 'lib/oflow/env.rb', line 15

def self.log_level()
  @@log_level
end

.log_level=(level) ⇒ Object

Sets the default log level.

Parameters:

  • level (Fixnum)

    Logger::Severity to set the default log level to



21
22
23
24
# File 'lib/oflow/env.rb', line 21

def self.log_level=(level)
  @@log_level = level unless level < Logger::Severity::DEBUG || Logger::Severity::FATAL < level
  #@log.receive(:severity, Box.new(@log_level)) unless @log.nil?
end

Instance Method Details

#_clearObject

Resets the error handler and log. Usually called on init and by the clear() method.



241
242
243
244
# File 'lib/oflow/env.rb', line 241

def _clear()
  @error_handler = Task.new(self, :error, Actors::ErrorHandler)
  @log = Task.new(self, :log, Actors::Log)
end

#_locate(path) ⇒ Object



137
138
139
140
141
# File 'lib/oflow/env.rb', line 137

def _locate(path)
  f = @flows[path[0].to_sym]
  return f if f.nil? || 1 == path.size
  f._locate(path[1..-1])
end

#_validation_errorsObject

Returns an Array of validation errors.



82
83
84
85
86
# File 'lib/oflow/env.rb', line 82

def _validation_errors()
  errors = []
  @flows.each_value { |f| errors += f._validation_errors() }
  errors
end

#busy?true|false

Returns true of one or more Tasks is either processing a request or has a request waiting to be processed on it’s input queue.

Returns:

  • (true|false)

    the busy state across all Tasks



159
160
161
162
163
164
# File 'lib/oflow/env.rb', line 159

def busy?
  @flows.each_value { |f| return true if f.busy? }
  return true if !@log.nil? && @log.busy?
  return true if !@error_handler.nil? && @error_handler.busy?
  false
end

#clearObject

Clears out all Tasks and Flows and resets the object back to a empty state.



233
234
235
236
237
# File 'lib/oflow/env.rb', line 233

def clear()
  shutdown()
  @flows = {}
  _clear()
end

#describe(detail = 0, indent = 0) ⇒ Object

Describes all the Flows and Tasks in the system.



247
248
249
250
251
252
253
254
255
# File 'lib/oflow/env.rb', line 247

def describe(detail=0, indent=0)
  i = ' ' * indent
  lines = ["#{i}#{@name} (#{self.class.name}) {"]
  @flows.each_value { |f|
    lines << f.describe(detail, indent + 2)
  }
  lines << i + "}"
  lines.join("\n")
end

#each_flow(&blk) ⇒ Object

Iterates over each Flow and yields to the provided block with each Flow.

Parameters:

  • blk (Proc)

    Proc to call on each iteration



97
98
99
# File 'lib/oflow/env.rb', line 97

def each_flow(&blk)
  @flows.each { |name,flow| blk.yield(flow) }
end

#error_handlerTask

Returns a error_handler Task if one is set on the instance.

Returns:

  • (Task)

    error_handler Task.



47
48
49
# File 'lib/oflow/env.rb', line 47

def error_handler()
  @error_handler
end

#find_flow(name) ⇒ Flow|nil

Locates and return a Flow with the specified name.

Parameters:

  • name (String)

    name of the Flow

Returns:

  • (Flow|nil)

    the Flow with the name specified or nil



122
123
124
125
# File 'lib/oflow/env.rb', line 122

def find_flow(name)
  name = name.to_sym unless name.nil?
  @flows[name]
end

#flow(name, &block) {|f| ... } ⇒ Flow

Creates a Flow and yield to a block with the newly create Flow. Used to contruct Flows.

Parameters:

  • name (Symbol|String)

    base name for the Flow

  • options (Hash)

    optional parameters

  • block (Proc)

    block to yield to with the new Flow instance

Yields:

  • (f)

Returns:

  • (Flow)

    new Flow



57
58
59
60
61
62
# File 'lib/oflow/env.rb', line 57

def flow(name, &block)
  f = Flow.new(self, name)
  @flows[f.name] = f
  yield(f) if block_given?
  f
end

#flow_countObject

Returns the number of active Tasks.



144
145
146
# File 'lib/oflow/env.rb', line 144

def flow_count()
  @flows.size
end

#flushObject

Wakes up all the Tasks in the Flow and waits for the system to become idle before returning.



202
203
204
205
206
207
208
# File 'lib/oflow/env.rb', line 202

def flush()
  wakeup()
  @flows.each_value { |f| f.flush() }
  while busy?
    sleep(0.2)
  end
end

#full_nameObject



35
36
37
# File 'lib/oflow/env.rb', line 35

def full_name()
  @name
end

#locate(name) ⇒ Flow|Task|nil

Locates and return a Task with the specified full name.

Parameters:

  • name (String)

    full name of the Task

Returns:

  • (Flow|Task|nil)

    the Flow or Task with the name specified or nil



130
131
132
133
134
135
# File 'lib/oflow/env.rb', line 130

def locate(name)
  name = name[1..-1] if name.start_with?(':')
  name = name[0..-2] if name.end_with?(':')
  path = name.split(':')
  _locate(path)
end

#logTask

Returns a log Task if one is set on the instance.

Returns:

  • (Task)

    log Task.



41
42
43
# File 'lib/oflow/env.rb', line 41

def log()
  @log
end

#prepareObject



64
65
66
67
68
69
70
# File 'lib/oflow/env.rb', line 64

def prepare()
  @flows.each_value { |f|
    f.resolve_all_links()
  }
  validate()
  @prepared = true
end

#queue_countFixnum

Returns the sum of all the requests in all the Flow’s Task’s queues.

Returns:

  • (Fixnum)

    total number of items waiting to be processed



150
151
152
153
154
# File 'lib/oflow/env.rb', line 150

def queue_count()
  cnt = 0
  @flows.each_value { |f| cnt += f.queue_count() }
  cnt
end

Resolves all the Links on all the Flows being managed.



89
90
91
92
93
# File 'lib/oflow/env.rb', line 89

def resolve_all_links()
  @flows.each_value { |f|
    f.resolve_all_links()
  }
end

#shutdown(flush_first = false) ⇒ Object

Shuts down all Tasks.

Parameters:

  • flush_first (true|false) (defaults to: false)

    flag indicating shutdown should occur after the system becomes idle



220
221
222
223
224
225
226
227
228
229
230
# File 'lib/oflow/env.rb', line 220

def shutdown(flush_first=false)
  # block all tasks first so threads can empty queues
  @flows.each_value do |f|
    f.state = Task::BLOCKED
  end
  # shutdown and wait for queues to empty if necessary
  @flows.each_value do |f|
    f.shutdown(flush_first)
  end
  @flows = {}
end

#startObject

Calls the start() method on all Tasks.



190
191
192
193
# File 'lib/oflow/env.rb', line 190

def start()
  prepare() unless @prepared
  @flows.each_value { |f| f.start() }
end

#state=(s) ⇒ Object

Sets the state of all Tasks recursively. This should not be called directly.



212
213
214
215
216
# File 'lib/oflow/env.rb', line 212

def state=(s)
  @flows.each_value do |f|
    f.state = s
  end
end

#stepObject

Calls the step() method one Task that is stopped and has an item in the queue. The Tasks with the highest backed_up() value is selected.



173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
# File 'lib/oflow/env.rb', line 173

def step()
  max = 0.0
  best = nil
  walk_tasks() do |t|
    if Task::STOPPED == t.state
      bu = t.backed_up()
      if max < bu
        best = t
        max = bu
      end
    end
  end
  best.step() unless best.nil?
  best
end

#stopObject

Calls the stop() method on all Tasks.



167
168
169
# File 'lib/oflow/env.rb', line 167

def stop()
  @flows.each_value { |f| f.stop() }
end

#validateObject

Validates the container by verifying all links on a task have been set to a valid destination and that destination has been resolved.

Raises:



75
76
77
78
79
# File 'lib/oflow/env.rb', line 75

def validate()
  # collects errors and raises all errors at once if there are any
  errors = _validation_errors()
  raise ValidateError.new(errors) unless errors.empty?
end

#wakeupObject

Wakes up all the Tasks in the Flow.



196
197
198
# File 'lib/oflow/env.rb', line 196

def wakeup()
  @flows.each_value { |f| f.wakeup() }
end

#walk_flows(&blk) ⇒ Object

Performs a recursive walk over all Flows and yields to the provided block for each.

Parameters:

  • blk (Proc)

    Proc to call on each iteration



104
105
106
107
108
# File 'lib/oflow/env.rb', line 104

def walk_flows(&blk)
  @flows.each_value do |f|
    blk.yield(t)
  end
end

#walk_tasks(&blk) ⇒ Object

Performs a recursive walk over all Tasks in all Flows and yields to the provided block for each.

Parameters:

  • blk (Proc)

    Proc to call on each iteration



113
114
115
116
117
# File 'lib/oflow/env.rb', line 113

def walk_tasks(&blk)
  @flows.each_value do |f|
    f.walk_tasks(&blk)
  end
end