Module: OFlow::HasTasks

Included in:
Env, Flow
Defined in:
lib/oflow/hastasks.rb

Overview

Provides the ability to have Tasks and Flows.

Instance Method Summary collapse

Instance Method Details

#_locate(path) ⇒ Object



114
115
116
117
118
# File 'lib/oflow/hastasks.rb', line 114

def _locate(path)
  t = @tasks[path[0].to_sym]
  return t if t.nil? || 1 == path.size
  t._locate(path[1..-1])
end

#_validation_errorsObject

Returns an Array of validation errors.



57
58
59
60
61
# File 'lib/oflow/hastasks.rb', line 57

def _validation_errors()
  errors = []
  @tasks.each_value { |t| errors += t._validation_errors() }
  errors
end

#busy?true|false

Returns true of one or more Tasks is either processing a request or has a request waiting to be processed on it’s input queue.

Returns:

  • (true|false)

    the busy state across all Tasks



136
137
138
139
# File 'lib/oflow/hastasks.rb', line 136

def busy?
  @tasks.each_value { |task| return true if task.busy? }
  false
end

#clearObject

Clears out all Tasks and Flows and resets the object back to a empty state.



207
208
209
210
211
# File 'lib/oflow/hastasks.rb', line 207

def clear()
  shutdown()
  @tasks = {}
  _clear()
end

#each_task(&blk) ⇒ Object

Iterates over each Task and yields to the provided block with each Task.

Parameters:

  • blk (Proc)

    Proc to call on each iteration



76
77
78
# File 'lib/oflow/hastasks.rb', line 76

def each_task(&blk)
  @tasks.each { |name,task| blk.yield(task) }
end

#find_task(name) ⇒ Task|nil

Locates and return a Task with the specified name.

Parameters:

  • name (String)

    name of the Task

Returns:

  • (Task|nil)

    the Task with the name specified or nil



98
99
100
101
102
# File 'lib/oflow/hastasks.rb', line 98

def find_task(name)
  name = name.to_sym unless name.nil?
  return self if :flow == name
  @tasks[name]
end

#flow(name, options = {}, &block) {|f| ... } ⇒ Flow

Creates a Flow and yield to a block with the newly create Flow. Used to contruct Flows.

Parameters:

  • name (Symbol|String)

    base name for the Flow

  • options (Hash) (defaults to: {})

    optional parameters

  • block (Proc)

    block to yield to with the new Flow instance

Yields:

  • (f)

Returns:

  • (Flow)

    new Flow



18
19
20
21
22
23
24
25
26
27
28
29
# File 'lib/oflow/hastasks.rb', line 18

def flow(name, options={}, &block)
  f = Flow.new(self, name, options)
  @tasks[f.name] = f
  yield(f) if block_given?
  f.resolve_all_links()
  # Wait to validate until at the top so up-links don't fail validation.
  if Env == self
    f.validate() 
    f.start()
  end
  f
end

#flushObject

Wakes up all the Tasks in the Flow and waits for the system to become idle before returning.



176
177
178
179
180
181
182
# File 'lib/oflow/hastasks.rb', line 176

def flush()
  wakeup()
  @tasks.each_value { |t| t.flush() }
  while busy?
    sleep(0.2)
  end
end

#init_tasksObject

Initializes the tasks attribute.



8
9
10
# File 'lib/oflow/hastasks.rb', line 8

def init_tasks()
  @tasks = {}
end

#locate(name) ⇒ Task|nil

Locates and return a Task with the specified full name.

Parameters:

  • name (String)

    full name of the Task

Returns:

  • (Task|nil)

    the Task with the name specified or nil



107
108
109
110
111
112
# File 'lib/oflow/hastasks.rb', line 107

def locate(name)
  name = name[1..-1] if name.start_with?(':')
  name = name[0..-2] if name.end_with?(':')
  path = name.split(':')
  _locate(path)
end

#queue_countFixnum

Returns the sum of all the requests in all the Tasks’s queues.

Returns:

  • (Fixnum)

    total number of items waiting to be processed



127
128
129
130
131
# File 'lib/oflow/hastasks.rb', line 127

def queue_count()
  cnt = 0
  @tasks.each_value { |task| cnt += task.queue_count() }
  cnt
end

Resolves all the Links on all the Tasks and Flows being managed as well as any Links in the instance itself.



65
66
67
68
69
70
71
72
# File 'lib/oflow/hastasks.rb', line 65

def resolve_all_links()
  @links.each_value { |lnk|
    set_link_target(lnk) if lnk.target.nil?
  }
  @tasks.each_value { |t|
    t.resolve_all_links()
  }
end

#shutdown(flush_first = false) ⇒ Object

Shuts down all Tasks.

Parameters:

  • flush_first (true|false) (defaults to: false)

    flag indicating shutdown should occur after the system becomes idle



194
195
196
197
198
199
200
201
202
203
204
# File 'lib/oflow/hastasks.rb', line 194

def shutdown(flush_first=false)
  # block all tasks first so threads can empty queues
  @tasks.each_value do |task|
    task.state = Task::BLOCKED
  end
  # shutdown and wait for queues to empty if necessary
  @tasks.each_value do |task|
    task.shutdown(flush_first)
  end
  @tasks = {}
end

#startObject

Calls the start() method on all Tasks.



165
166
167
# File 'lib/oflow/hastasks.rb', line 165

def start()
  @tasks.each_value { |task| task.start() }
end

#state=(s) ⇒ Object

Sets the state of all Tasks recursively. This should not be called directly.



186
187
188
189
190
# File 'lib/oflow/hastasks.rb', line 186

def state=(s)
  @tasks.each_value do |task|
    task.state = s
  end
end

#stepObject

Calls the step() method one Task that is stopped and has an item in the queue. The Tasks with the highest backed_up() value is selected.



148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
# File 'lib/oflow/hastasks.rb', line 148

def step()
  max = 0.0
  best = nil
  walk_tasks() do |t|
    if Task::STOPPED == t.state
      bu = t.backed_up()
      if max < bu
        best = t
        max = bu
      end
    end
  end
  best.step() unless best.nil?
  best
end

#stopObject

Calls the stop() method on all Tasks.



142
143
144
# File 'lib/oflow/hastasks.rb', line 142

def stop()
  @tasks.each_value { |task| task.stop() }
end

#task(name, actor_class, options = {}, &block) {|t| ... } ⇒ Task

Creates a Task and yield to a block with the newly create Task. Used to configure Tasks.

Parameters:

  • name (Symbol|String)

    base name for the Task

  • actor_class (Class)

    Class to create an Actor instance of

  • options (Hash) (defaults to: {})

    optional parameters

  • block (Proc)

    block to yield to with the new Task instance

Yields:

  • (t)

Returns:

  • (Task)

    new Task



38
39
40
41
42
43
44
45
# File 'lib/oflow/hastasks.rb', line 38

def task(name, actor_class, options={}, &block)
  has_state = options.has_key?(:state)
  options[:state] = Task::STOPPED unless has_state
  t = Task.new(self, name, actor_class, options)
  @tasks[t.name] = t
  yield(t) if block_given?
  t
end

#task_countObject

Returns the number of active Tasks.



121
122
123
# File 'lib/oflow/hastasks.rb', line 121

def task_count()
  @tasks.size
end

#validateObject

Validates the container by verifying all links on a task have been set to a valid destination and that destination has been resolved.

Raises:



50
51
52
53
54
# File 'lib/oflow/hastasks.rb', line 50

def validate()
  # collects errors and raises all errors at once if there are any
  errors = _validation_errors()
  raise ValidateError.new(errors) unless errors.empty?
end

#wakeupObject

Wakes up all the Tasks in the Flow.



170
171
172
# File 'lib/oflow/hastasks.rb', line 170

def wakeup()
  @tasks.each_value { |t| t.wakeup() }
end

#walk_tasks(tasks_only = true, &blk) ⇒ Object

Performs a recursive walk over all Tasks and yields to the provided block for each. Flows are followed recusively.

Parameters:

  • tasks_only (true|false) (defaults to: true)

    indicates on Tasks and not Flows are yielded to

  • blk (Proc)

    Proc to call on each iteration



84
85
86
87
88
89
90
91
92
93
# File 'lib/oflow/hastasks.rb', line 84

def walk_tasks(tasks_only=true, &blk)
  @tasks.each_value do |t|
    if t.is_a?(Task)
      blk.yield(t)
    else
      blk.yield(t) unless tasks_only
      t.walk_tasks(tasks_only, &blk)
    end
  end
end