Module: PWork::Async

Defined in:
lib/pwork/async.rb,
lib/pwork/async/task.rb,
lib/pwork/async/manager.rb,
lib/pwork/async/exceptions/task_error.rb,
lib/pwork/async/exceptions/invalid_options.rb

Defined Under Namespace

Modules: Exceptions Classes: Manager, Task

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.add_task(options, &block) ⇒ Object



49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# File 'lib/pwork/async.rb', line 49

def self.add_task(options, &block)
  manager.start unless manager.running

  task = PWork::Async::Task.new.tap do |e|
    e.block = block
    e.caller = options[:caller]
  end

  unless options[:wait] == false
    tasks << task
  end

  manager.add_task(task)

  task.id
end

.async_forked(options = {}, &block) ⇒ Object



22
23
24
25
26
27
28
29
30
31
32
33
34
# File 'lib/pwork/async.rb', line 22

def self.async_forked(options = {}, &block)
  if block_given?
    pid = fork do
      block.call
    end
    PWork::Async.tasks << pid unless options[:wait] == false
  else
    PWork::Async.tasks.each do |pid|
      Process.wait(pid)
    end
    reset
  end
end

.async_test(options = {}, &block) ⇒ Object



18
19
20
# File 'lib/pwork/async.rb', line 18

def self.async_test(options = {}, &block)
  block.call if block_given?
end

.async_threaded(options = {}, &block) ⇒ Object



36
37
38
39
40
41
42
43
# File 'lib/pwork/async.rb', line 36

def self.async_threaded(options = {}, &block)
  if block_given?
    options[:caller] = self
    PWork::Async.add_task(options, &block)
  else
    PWork::Async.wait_for_tasks({ caller: self, command: options })
  end
end

.async_wait_sleep_iterationObject



99
100
101
# File 'lib/pwork/async.rb', line 99

def self.async_wait_sleep_iteration
  Float(ENV.fetch('PWORK_ASYNC_WAIT_SLEEP_ITERATION', '0.02'))
end

.handle_errorsObject



88
89
90
91
92
93
94
95
96
97
# File 'lib/pwork/async.rb', line 88

def self.handle_errors
  error_messages = []
  tasks.select { |t| t.state == :error }.each do |t|
    error_messages << "Error: #{t.error.message}, #{t.error.backtrace}"
  end
  raise PWork::Async::Exceptions::TaskError.new(
    "1 or more async errors occurred. #{error_messages.join(' | ')}"
  ) if error_messages.length > 0
  true
end

.managerObject



45
46
47
# File 'lib/pwork/async.rb', line 45

def self.manager
  @manager ||= PWork::Async::Manager.new
end

.modeObject



103
104
105
# File 'lib/pwork/async.rb', line 103

def self.mode
  ENV.fetch('PWORK_ASYNC_MODE', 'thread').to_s.downcase
end

.resetObject



107
108
109
# File 'lib/pwork/async.rb', line 107

def self.reset
  Thread.current[:pwork_async_tasks] = []
end

.tasksObject



66
67
68
# File 'lib/pwork/async.rb', line 66

def self.tasks
  Thread.current[:pwork_async_tasks] ||= []
end

.wait_for_tasks(options) ⇒ Object



70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
# File 'lib/pwork/async.rb', line 70

def self.wait_for_tasks(options)
  case options[:command]
    when :wait
      task_list = tasks
    when :wait_local
      task_list = tasks.select { |t| t.caller == options[:caller] }
  end

  until task_list.detect { |t| t.state == :pending || t.state == :active }.nil?
    sleep(async_wait_sleep_iteration)
  end

  handle_errors

  ensure
    Thread.current[:pwork_async_tasks] = []
end

Instance Method Details

#async(options = {}, &block) ⇒ Object



7
8
9
10
11
12
13
14
15
16
# File 'lib/pwork/async.rb', line 7

def async(options = {}, &block)
  case PWork::Async.mode
    when 'fork'
      PWork::Async.async_forked(options, &block)
    when 'test'
      PWork::Async.async_test(options, &block)
    else
      PWork::Async.async_threaded(options, &block)
  end
end