Module: PWork::Async

Defined in:
lib/pwork/async.rb,
lib/pwork/async/task.rb,
lib/pwork/async/manager.rb,
lib/pwork/async/exceptions/task_error.rb,
lib/pwork/async/exceptions/invalid_options.rb

Defined Under Namespace

Modules: Exceptions Classes: Manager, Task

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.add_task(options, &block) ⇒ Object



42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
# File 'lib/pwork/async.rb', line 42

def self.add_task(options, &block)
  manager.start unless manager.running

  task = PWork::Async::Task.new.tap do |e|
    e.block = block
    e.caller = options[:caller]
  end

  unless options[:wait] == false
    tasks << task
  end

  manager.add_task(task)

  task.id
end

.async_forked(options = {}, &block) ⇒ Object



15
16
17
18
19
20
21
22
23
24
25
26
27
# File 'lib/pwork/async.rb', line 15

def self.async_forked(options = {}, &block)
  if block_given?
    pid = fork do
      block.call
    end
    PWork::Async.tasks << pid unless options[:wait] == false
  else
    PWork::Async.tasks.each do |pid|
      Process.wait(pid)
    end
    reset
  end
end

.async_threaded(options = {}, &block) ⇒ Object



29
30
31
32
33
34
35
36
# File 'lib/pwork/async.rb', line 29

def self.async_threaded(options = {}, &block)
  if block_given?
    options[:caller] = self
    PWork::Async.add_task(options, &block)
  else
    PWork::Async.wait_for_tasks({ caller: self, command: options })
  end
end

.async_wait_sleep_iterationObject



92
93
94
# File 'lib/pwork/async.rb', line 92

def self.async_wait_sleep_iteration
  Float(ENV.fetch('PWORK_ASYNC_WAIT_SLEEP_ITERATION', '0.02'))
end

.handle_errorsObject



81
82
83
84
85
86
87
88
89
90
# File 'lib/pwork/async.rb', line 81

def self.handle_errors
  error_messages = []
  tasks.select { |t| t.state == :error }.each do |t|
    error_messages << "Error: #{t.error.message}, #{t.error.backtrace}"
  end
  raise PWork::Async::Exceptions::TaskError.new(
    "1 or more async errors occurred. #{error_messages.join(' | ')}"
  ) if error_messages.length > 0
  true
end

.managerObject



38
39
40
# File 'lib/pwork/async.rb', line 38

def self.manager
  @manager ||= PWork::Async::Manager.new
end

.modeObject



96
97
98
99
100
101
102
# File 'lib/pwork/async.rb', line 96

def self.mode
  if ENV['PWORK_ASYNC_MODE'].to_s.downcase == 'fork'
    'fork'
  else
    'thread'
  end
end

.resetObject



104
105
106
# File 'lib/pwork/async.rb', line 104

def self.reset
  Thread.current[:pwork_async_tasks] = []
end

.tasksObject



59
60
61
# File 'lib/pwork/async.rb', line 59

def self.tasks
  Thread.current[:pwork_async_tasks] ||= []
end

.wait_for_tasks(options) ⇒ Object



63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
# File 'lib/pwork/async.rb', line 63

def self.wait_for_tasks(options)
  case options[:command]
    when :wait
      task_list = tasks
    when :wait_local
      task_list = tasks.select { |t| t.caller == options[:caller] }
  end

  until task_list.detect { |t| t.state == :pending || t.state == :active }.nil?
    sleep(async_wait_sleep_iteration)
  end

  handle_errors

  ensure
    Thread.current[:pwork_async_tasks] = []
end

Instance Method Details

#async(options = {}, &block) ⇒ Object



7
8
9
10
11
12
13
# File 'lib/pwork/async.rb', line 7

def async(options = {}, &block)
  if PWork::Async.mode == 'fork'
    PWork::Async.async_forked(options, &block)
  else
    PWork::Async.async_threaded(options, &block)
  end
end