Class: Taskflow::Flow
- Inherits:
-
ActiveRecord::Base
- Object
- ActiveRecord::Base
- Taskflow::Flow
- Defined in:
- lib/taskflow/flow.rb
Class Method Summary collapse
-
.can_launch?(klass, opts = {}) ⇒ Boolean
opts support :params.
- .launch(klass, opts = {}) ⇒ Object
Instance Method Summary collapse
- #resume ⇒ Object
-
#run(klass, opts = {}) ⇒ Object
opts support :name,:params.
- #running_steps ⇒ Object
- #schedule ⇒ Object
- #stop!(user_id = nil) ⇒ Object
Class Method Details
.can_launch?(klass, opts = {}) ⇒ Boolean
opts support :params
16 17 18 19 |
# File 'lib/taskflow/flow.rb', line 16 def can_launch?(klass,opts={}) opts = HashWithIndifferentAccess.new opts !Taskflow::Flow.where.not(state: 'stopped').where(klass: klass,input: opts[:params]).exists? end |
.launch(klass, opts = {}) ⇒ Object
21 22 23 24 25 26 27 28 29 30 31 32 |
# File 'lib/taskflow/flow.rb', line 21 def launch(klass,opts={}) opts = HashWithIndifferentAccess.new opts flow_klass = Kernel.const_get klass name = flow_klass.const_get 'NAME' opts[:launched_by] ||= 'task-flow-engine' flow = flow_klass.create name: name,input: opts[:params],launched_by: opts[:launched_by] if opts[:next_workflow_config] flow.update next_config: opts[:next_workflow_config] end flow.create_tflogger name: name,description: opts[:workflow_description] flow.schedule end |
Instance Method Details
#resume ⇒ Object
77 78 79 80 81 |
# File 'lib/taskflow/flow.rb', line 77 def resume self.tasks.where(state: 'paused',result: 'error').each do |task| task.resume end end |
#run(klass, opts = {}) ⇒ Object
opts support :name,:params
40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 |
# File 'lib/taskflow/flow.rb', line 40 def run(klass,opts={}) obj = { klass: klass.to_s, name: opts[:name] || klass.to_s, input: opts[:params], index: self.tasks.size + 1 } task = klass.create obj.select{|k,v| v } if opts[:before] task.downstream << opts[:before] if opts[:before].is_a? Array opts[:before].each{|b| b.upstream << task} else opts[:before].upstream << task end end if opts[:after] task.upstream << opts[:after] if opts[:after].is_a? Array opts[:after].each{|d| d.downstream << task } else opts[:after].downstream << task end end if opts[:before].nil? && opts[:after].nil? && self.tasks.last self.tasks.last.downstream << task task.upstream << self.tasks.last end self.tasks << task task end |
#running_steps ⇒ Object
35 36 37 |
# File 'lib/taskflow/flow.rb', line 35 def running_steps self.tasks.where(state: ['running','paused']) end |
#schedule ⇒ Object
83 84 85 86 87 88 89 90 91 92 93 94 95 |
# File 'lib/taskflow/flow.rb', line 83 def schedule return if self.halt_by || self.state == 'stopped' self.update_attributes! state: 'running',started_at: Time.now if self.state == 'pending' task_list = [] self.reload.tasks.where(state: 'pending').each do |task| # 上游全部完成 if task.upstream.empty? || task.upstream.all?{|t| %w(skipped stopped).include? t.state } task_list << task.id.to_s end end task_list.each{|tid| Taskflow::Worker.perform_async self.id.to_s,tid } self end |
#stop!(user_id = nil) ⇒ Object
72 73 74 75 |
# File 'lib/taskflow/flow.rb', line 72 def stop!(user_id=nil) percent = self.tasks.map(&:progress).sum / self.tasks.size self.update_attributes! progress: percent,halt_by: user_id,ended_at: Time.now, state: 'stopped',result: 'warning' end |