Module: Abid::RakeExtensions::Task

Defined in:
lib/abid/rake_extensions/task.rb

Instance Method Summary collapse

Instance Method Details

#async_execute_in_worker(worker = nil, &block) ⇒ Object



135
136
137
138
139
140
141
142
143
# File 'lib/abid/rake_extensions/task.rb', line 135

def async_execute_in_worker(worker = nil, &block)
  application.worker[worker || self.worker].post do
    begin
      block.call
    rescue Exception => err
      state.ivar.try_fail(err)
    end
  end
end

#async_execute_with_session(task_args) ⇒ Object



88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
# File 'lib/abid/rake_extensions/task.rb', line 88

def async_execute_with_session(task_args)
  async_execute_in_worker do
    begin
      state.session do
        begin
          execute(task_args) if needed?
          finished = true
        ensure
          fail "#{name} -- thread killed" if $ERROR_INFO.nil? && !finished
        end
      end

      state.ivar.try_set(true)
    rescue AbidErrorTaskAlreadyRunning
      async_wait_complete
    end
  end
end

#async_invoke(*args) ⇒ Object



20
21
22
23
# File 'lib/abid/rake_extensions/task.rb', line 20

def async_invoke(*args)
  task_args = Rake::TaskArguments.new(arg_names, args)
  async_invoke_with_call_chain(task_args, Rake::InvocationChain::EMPTY)
end

#async_invoke_tasks(tasks, task_args, invocation_chain, &block) ⇒ Object



63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
# File 'lib/abid/rake_extensions/task.rb', line 63

def async_invoke_tasks(tasks, task_args, invocation_chain, &block)
  ivars = tasks.map do |t|
    args = task_args.new_scope(t.arg_names)
    t.async_invoke_with_call_chain(args, invocation_chain)
  end

  if ivars.empty?
    block.call(false)
  else
    counter = Concurrent::DependencyCounter.new(ivars.size) do
      begin
        if ivars.any?(&:rejected?)
          state.ivar.try_fail(ivars.find(&:rejected?).reason)
        else
          updated = ivars.map(&:value).any?
          block.call(updated)
        end
      rescue Exception => err
        state.ivar.try_fail(err)
      end
    end
    ivars.each { |i| i.add_observer counter }
  end
end

#async_invoke_with_call_chain(task_args, invocation_chain) ⇒ Object



25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# File 'lib/abid/rake_extensions/task.rb', line 25

def async_invoke_with_call_chain(task_args, invocation_chain)
  state.reload

  new_chain = Rake::InvocationChain.append(self, invocation_chain)

  state.only_once do
    if !application.options.repair && state.successed?
      # skip if successed
      state.ivar.try_set(false)
    elsif !application.options.repair && state.failed? && !invocation_chain.empty?
      # fail if not top level
      fail "#{name} -- task has been failed" rescue state.ivar.try_fail($ERROR_INFO)
    else
      async_invoke_with_prerequisites(task_args, new_chain)
    end
  end
  state.ivar
ensure
  state.ivar.try_fail($ERROR_INFO) if $ERROR_INFO
end

#async_invoke_with_prerequisites(task_args, invocation_chain) ⇒ Object



46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
# File 'lib/abid/rake_extensions/task.rb', line 46

def async_invoke_with_prerequisites(task_args, invocation_chain)
  application.trace "** Invoke #{name_with_params}" if application.options.trace

  volatiles, non_volatiles = prerequisite_tasks.partition(&:volatile?)

  async_invoke_tasks(non_volatiles, task_args, invocation_chain) do |updated|
    if state.successed? && !updated
      application.trace "** Skip #{name_with_params}" if application.options.trace
      state.ivar.try_set(false)
    else
      async_invoke_tasks(volatiles, task_args, invocation_chain) do
        async_execute_with_session(task_args)
      end
    end
  end
end

#async_wait_completeObject



107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
# File 'lib/abid/rake_extensions/task.rb', line 107

def async_wait_complete
  unless application.options.wait_external_task
    err = RuntimeError.new("task #{name_with_params} already running")
    return state.ivar.try_fail(err)
  end

  application.trace "** Wait #{name_with_params}" if application.options.trace

  async_execute_in_worker(:waiter) do
    interval = application.options.wait_external_task_interval || 10
    timeout = application.options.wait_external_task_timeout || 3600
    timeout_tm = Time.now.to_f + timeout

    loop do
      state.reload
      if !state.running?
        state.ivar.try_set(true)
        break
      elsif Time.now.to_f >= timeout_tm
        fail "#{name} -- timeout exceeded" rescue state.ivar.try_fail($ERROR_INFO)
        break
      else
        sleep interval
      end
    end
  end
end

#name_with_paramsObject



16
17
18
# File 'lib/abid/rake_extensions/task.rb', line 16

def name_with_params
  name
end

#stateObject



12
13
14
# File 'lib/abid/rake_extensions/task.rb', line 12

def state
  State.find(self)
end

#volatile?Boolean

Returns:

  • (Boolean)


4
5
6
# File 'lib/abid/rake_extensions/task.rb', line 4

def volatile?
  true
end

#workerObject



8
9
10
# File 'lib/abid/rake_extensions/task.rb', line 8

def worker
  :default
end