Class: Taskflow::Worker

Inherits:
Object
  • Object
show all
Includes:
Sidekiq::Worker
Defined in:
lib/taskflow/worker.rb

Instance Method Summary collapse

Instance Method Details

#perform(task_flow_id, job_id, opts = {}) ⇒ Object



6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# File 'lib/taskflow/worker.rb', line 6

def perform(task_flow_id,job_id,opts={})
        flow = Taskflow::Flow.find task_flow_id
        task = Taskflow::Task.find job_id
        begin
            reason = catch :control do
check_flow_state flow
check_task_state task
task.update_attributes! state: 'running',started_at: Time.now,ended_at: nil, progress: 0.5,output: {},error: nil,result: nil
task.go logger
            end
            case reason
            when :flow_halt
flow.update_attributes! ended_at: Time.now unless flow.ended_at
            when :suspend
task.update_attributes! result: 'suspend',state: 'paused'
            when :skip
task.update_attributes! state: 'skipped',data: {}
            when :already_running,:already_stopped
return
            else
task.update_attributes! data: {},ended_at: Time.now,progress: 1,state: 'stopped',result: 'success'
            end
        rescue=>exception
            task.error = {
class: exception.class.to_s,
        message: exception.to_s,
        backtrace: exception.backtrace
    }
    task.state = 'paused'
    task.result = 'error'
    task.ended_at = Time.now
    task.save
end
update_flow flow.reload
flow.schedule
end