Module: PWork::Async

Defined in:
lib/pwork/async.rb,
lib/pwork/async/task.rb,
lib/pwork/async/manager.rb,
lib/pwork/async/exceptions/task_error.rb,
lib/pwork/async/exceptions/invalid_options.rb

Defined Under Namespace

Modules: Exceptions Classes: Manager, Task

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.add_task(options, &block) ⇒ Object



49
50
51
52
53
54
55
56
57
58
59
60
61
62
# File 'lib/pwork/async.rb', line 49

def self.add_task(options, &block)
  task = PWork::Async::Task.new.tap do |e|
    e.block = block
    e.caller = options[:caller]
  end

  unless options[:wait] == false
    tasks << task
  end

  manager.add_task(task)

  task.id
end

.async_forked(options = {}, &block) ⇒ Object



22
23
24
25
26
27
28
29
30
31
32
33
34
# File 'lib/pwork/async.rb', line 22

def self.async_forked(options = {}, &block)
  if block_given?
    pid = fork do
      block.call
    end
    PWork::Async.tasks << pid unless options[:wait] == false
  else
    PWork::Async.tasks.each do |pid|
      Process.wait(pid)
    end
    reset
  end
end

.async_test(options = {}, &block) ⇒ Object



18
19
20
# File 'lib/pwork/async.rb', line 18

def self.async_test(options = {}, &block)
  block.call if block_given?
end

.async_threaded(options = {}, caller, &block) ⇒ Object



36
37
38
39
40
41
42
43
# File 'lib/pwork/async.rb', line 36

def self.async_threaded(options = {}, caller, &block)
  if block_given?
    options[:caller] = caller
    PWork::Async.add_task(options, &block)
  else
    PWork::Async.wait_for_tasks({ caller: caller, command: options })
  end
end

.handle_errorsObject



84
85
86
87
88
89
90
91
92
93
# File 'lib/pwork/async.rb', line 84

def self.handle_errors
  error_messages = []
  tasks.select { |t| t.state == :error }.each do |t|
    error_messages << "Error: #{t.error.message}, #{t.error.backtrace}"
  end
  raise PWork::Async::Exceptions::TaskError.new(
    "1 or more async errors occurred. #{error_messages.join(' | ')}"
  ) if error_messages.length > 0
  true
end

.managerObject



45
46
47
# File 'lib/pwork/async.rb', line 45

def self.manager
  @manager ||= PWork::Async::Manager.new
end

.modeObject



95
96
97
# File 'lib/pwork/async.rb', line 95

def self.mode
  ENV.fetch('PWORK_ASYNC_MODE', 'thread').to_s.downcase
end

.resetObject



99
100
101
# File 'lib/pwork/async.rb', line 99

def self.reset
  Thread.current[:pwork_async_tasks] = []
end

.tasksObject



64
65
66
# File 'lib/pwork/async.rb', line 64

def self.tasks
  Thread.current[:pwork_async_tasks] ||= []
end

.wait_for_tasks(options) ⇒ Object



68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
# File 'lib/pwork/async.rb', line 68

def self.wait_for_tasks(options)
  case options[:command]
    when :wait
      task_list = tasks
    when :wait_local
      task_list = tasks.select { |t| t.caller == options[:caller] }
  end

  task_list.each { |t| t.thread.join }

  handle_errors

  ensure
    Thread.current[:pwork_async_tasks] = []
end

Instance Method Details

#async(options = {}, &block) ⇒ Object



7
8
9
10
11
12
13
14
15
16
# File 'lib/pwork/async.rb', line 7

def async(options = {}, &block)
  case PWork::Async.mode
    when 'fork'
      PWork::Async.async_forked(options, &block)
    when 'test'
      PWork::Async.async_test(options, &block)
    else
      PWork::Async.async_threaded(options, self, &block)
  end
end