Class: Taskflow::Flow

Inherits:
ActiveRecord::Base
  • Object
show all
Defined in:
lib/taskflow/flow.rb

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.can_launch?(klass, opts = {}) ⇒ Boolean

opts support :params

Returns:

  • (Boolean)


16
17
18
19
# File 'lib/taskflow/flow.rb', line 16

def can_launch?(klass,opts={})
    opts = HashWithIndifferentAccess.new opts
    !Taskflow::Flow.where.not(state: 'stopped').where(klass: klass,input: opts[:params]).exists?
end

.launch(klass, opts = {}) ⇒ Object



21
22
23
24
25
26
27
28
29
30
31
32
# File 'lib/taskflow/flow.rb', line 21

def launch(klass,opts={})
    opts = HashWithIndifferentAccess.new opts
    flow_klass = Kernel.const_get klass
    name = flow_klass.const_get 'NAME'
    opts[:launched_by] ||= 'task-flow-engine'
    flow = flow_klass.create name: name,input: opts[:params],launched_by: opts[:launched_by]
    if opts[:next_workflow_config]
        flow.update next_config: opts[:next_workflow_config]
    end
    flow.create_tflogger name: name,description: opts[:workflow_description]
    flow.schedule
end

Instance Method Details

#resumeObject



77
78
79
80
81
# File 'lib/taskflow/flow.rb', line 77

def resume
    self.tasks.where(state: 'paused',result: 'error').each do |task|
        task.resume
    end
end

#run(klass, opts = {}) ⇒ Object

opts support :name,:params



40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
# File 'lib/taskflow/flow.rb', line 40

def run(klass,opts={})
    obj = {
        klass: klass.to_s,
        name: opts[:name] || klass.to_s,
        input: opts[:params],
        index: self.tasks.size + 1
    }
    task = klass.create obj.select{|k,v| v }
    if opts[:before]
        task.downstream << opts[:before]
        if opts[:before].is_a? Array
            opts[:before].each{|b| b.upstream << task}
        else
            opts[:before].upstream << task
        end
    end
    if opts[:after]
        task.upstream << opts[:after]
        if opts[:after].is_a? Array
            opts[:after].each{|d| d.downstream << task }
        else
            opts[:after].downstream << task
        end
    end
    if opts[:before].nil? && opts[:after].nil? && self.tasks.last
        self.tasks.last.downstream << task
        task.upstream << self.tasks.last
    end
    self.tasks << task
    task
end

#running_stepsObject



35
36
37
# File 'lib/taskflow/flow.rb', line 35

def running_steps
    self.tasks.where(state: ['running','paused'])
end

#scheduleObject



83
84
85
86
87
88
89
90
91
92
93
94
95
# File 'lib/taskflow/flow.rb', line 83

def schedule
    return if self.halt_by || self.state == 'stopped'
    self.update_attributes! state: 'running',started_at: Time.now if self.state == 'pending'
    task_list = []
    self.reload.tasks.where(state: 'pending').each do |task|
        # 上游全部完成
        if task.upstream.empty? || task.upstream.all?{|t| %w(skipped stopped).include? t.state }
            task_list << task.id.to_s
        end
    end
    task_list.each{|tid| Taskflow::Worker.perform_async self.id.to_s,tid }
    self
end

#stop!(user_id = nil) ⇒ Object



72
73
74
75
# File 'lib/taskflow/flow.rb', line 72

def stop!(user_id=nil)
    percent = self.tasks.map(&:progress).sum / self.tasks.size
    self.update_attributes! progress: percent,halt_by: user_id,ended_at: Time.now, state: 'stopped',result: 'warning'
end