Class: Taskflow::Flow

Inherits:
Object
  • Object
show all
Includes:
Mongoid::Document
Defined in:
lib/taskflow/flow.rb

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.can_launch?(klass, opts = {}) ⇒ Boolean

opts support :params,

Returns:

  • (Boolean)


26
27
28
29
# File 'lib/taskflow/flow.rb', line 26

def can_launch?(klass,opts={})
    opts = HashWithIndifferentAccess.new opts
    !Taskflow::Flow.ne(state: 'stopped').where(klass: klass,input: opts[:params]).exists?
end

.launch(klass, opts = {}) ⇒ Object



31
32
33
34
35
36
37
38
39
40
41
42
# File 'lib/taskflow/flow.rb', line 31

def launch(klass,opts={})
    opts = HashWithIndifferentAccess.new opts
    flow_klass = Kernel.const_get klass
    name = flow_klass.const_get 'NAME'
    opts[:launched_by] ||= 'task-flow-engine'
    flow = flow_klass.create name: name,input: opts[:params],launched_by: opts[:launched_by]
    if opts[:next_workflow_config]
        flow.update next_config: opts[:next_workflow_config]
    end
    flow.create_logger name: name,description: opts[:workflow_description]
    flow.schedule
end

Instance Method Details

#resumeObject



76
77
78
79
80
# File 'lib/taskflow/flow.rb', line 76

def resume
    self.tasks.where(state: 'paused',result: 'error').each do |task|
        task.resume
    end
end

#run(klass, opts = {}) ⇒ Object

opts support :name,:params



50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
# File 'lib/taskflow/flow.rb', line 50

def run(klass,opts={})
    obj = {
        klass: klass.to_s,
        name: opts[:name] || klass.to_s,
        input: opts[:params],
        index: self.tasks.size + 1
    }
    task = klass.create obj.select{|k,v| v }
    if opts[:before]
        task.downstream << opts[:before]
    end
    if opts[:after]
        task.upstream << opts[:after]
    end
    if opts[:before].nil? && opts[:after].nil? && self.tasks.last
        self.tasks.last.downstream << task
    end
    self.tasks << task
    task
end

#running_stepsObject



45
46
47
# File 'lib/taskflow/flow.rb', line 45

def running_steps
    self.tasks.in(state: ['running','paused'])
end

#scheduleObject



82
83
84
85
86
87
88
89
90
91
92
93
94
# File 'lib/taskflow/flow.rb', line 82

def schedule
    return if self.halt_by || self.state == 'stopped'
    self.update_attributes! state: 'running',started_at: Time.now if self.state == 'pending'
    task_list = []
    self.reload.tasks.where(state: 'pending').each do |task|
        # 上游全部完成
        if task.upstream.empty? || task.upstream.all?{|t| %w(skipped stopped).include? t.state }
            task_list << task.id.to_s
        end
    end
    task_list.each{|tid| Taskflow::Worker.perform_async self.id.to_s,tid }
    self
end

#stop!(user_id = nil) ⇒ Object



71
72
73
74
# File 'lib/taskflow/flow.rb', line 71

def stop!(user_id=nil)
    percent = self.tasks.map(&:progress).sum / self.tasks.size
    self.update_attributes! progress: percent,halt_by: user_id,ended_at: Time.now, state: 'stopped',result: 'warning'
end