Module: PWork::Async

Defined in:
lib/pwork/async.rb,
lib/pwork/async/task.rb,
lib/pwork/async/manager.rb,
lib/pwork/async/exceptions/task_error.rb,
lib/pwork/async/exceptions/invalid_options.rb

Defined Under Namespace

Modules: Exceptions Classes: Manager, Task

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.add_task(options, &block) ⇒ Object



20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# File 'lib/pwork/async.rb', line 20

def self.add_task(options, &block)
  manager.start unless manager.running

  task = PWork::Async::Task.new.tap do |e|
    e.block = block
    e.caller = options[:caller]
  end

  unless options[:wait] == false
    tasks << task
  end

  manager.add_task(task)

  task.id
end

.async_wait_sleep_iterationObject



70
71
72
# File 'lib/pwork/async.rb', line 70

def self.async_wait_sleep_iteration
  Float(ENV.fetch('PWORK_ASYNC_WAIT_SLEEP_ITERATION', '0.02'))
end

.handle_errorsObject



59
60
61
62
63
64
65
66
67
68
# File 'lib/pwork/async.rb', line 59

def self.handle_errors
  error_messages = []
  tasks.select { |t| t.state == :error }.each do |t|
    error_messages << "Error: #{t.error.message}, #{t.error.backtrace}"
  end
  raise PWork::Async::Exceptions::TaskError.new(
    "1 or more async errors occurred. #{error_messages.join(' | ')}"
  ) if error_messages.length > 0
  true
end

.managerObject



16
17
18
# File 'lib/pwork/async.rb', line 16

def self.manager
  @manager ||= PWork::Async::Manager.new
end

.tasksObject



37
38
39
# File 'lib/pwork/async.rb', line 37

def self.tasks
  Thread.current[:pwork_async_tasks] ||= []
end

.wait_for_tasks(options) ⇒ Object



41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
# File 'lib/pwork/async.rb', line 41

def self.wait_for_tasks(options)
  case options[:command]
    when :wait
      task_list = tasks
    when :wait_local
      task_list = tasks.select { |t| t.caller == options[:caller] }
  end

  until task_list.detect { |t| t.state == :pending || t.state == :active }.nil?
    sleep(async_wait_sleep_iteration)
  end

  handle_errors

  ensure
    Thread.current[:pwork_async_tasks] = []
end

Instance Method Details

#async(options = {}, &block) ⇒ Object



7
8
9
10
11
12
13
14
# File 'lib/pwork/async.rb', line 7

def async(options = {}, &block)
  if block_given?
    options[:caller] = self
    PWork::Async.add_task(options, &block)
  else
    PWork::Async.wait_for_tasks({ caller: self, command: options })
  end
end