Class: PWork::Async::Manager

Inherits:
Object
  • Object
show all
Defined in:
lib/pwork/async/manager.rb

Instance Method Summary collapse

Constructor Details

#initializeManager

Returns a new instance of Manager.



6
7
8
9
10
# File 'lib/pwork/async/manager.rb', line 6

def initialize
  @queue = Queue.new
  @threads = []
  @running = false
end

Instance Method Details

#add_task(task) ⇒ Object

This method is called to add a task to the queue



67
68
69
70
71
72
73
# File 'lib/pwork/async/manager.rb', line 67

def add_task(task)
  raise PWork::Async::Exceptions::InvalidOptionsError.new(
    'A valid async task must be specified.'
  ) unless task.is_a?(PWork::Async::Task)
  task.thread_local_storage = PWork::Helpers::Threads.get_thread_vars
  @queue.push(task)
end

#pool_sizeObject

This method is called to determine the size of the async thread pool



36
37
38
# File 'lib/pwork/async/manager.rb', line 36

def pool_size
  Integer(ENV.fetch('PWORK_ASYNC_POOL_SIZE', 10))
end

#process_taskObject

This method is called to process an async task from the queue



41
42
43
44
45
46
47
48
49
50
51
52
53
54
# File 'lib/pwork/async/manager.rb', line 41

def process_task
  task = @queue.pop
  task.state = :active
  thread_helper.set_thread_vars(task.thread_local_storage)
  begin
    task.block.call
    task.state = :complete
  rescue => e
    task.error = e
    task.state = :error
  ensure
    thread_helper.reset_thread_vars
  end
end

#queue_countObject

This method is called to determine the number of tasks in the queue



62
63
64
# File 'lib/pwork/async/manager.rb', line 62

def queue_count
  @queue.length
end

#runningObject

This method is called to determine if this manager is running or not



13
14
15
# File 'lib/pwork/async/manager.rb', line 13

def running
  @running
end

#startObject

This method is called to start the async manager



18
19
20
21
22
23
24
25
26
27
# File 'lib/pwork/async/manager.rb', line 18

def start
  @running = true
  pool_size.times do
    @threads << Thread.new do
      until @running == false
        process_task
      end
    end
  end
end

#stopObject

This method is called to stop the async manager



30
31
32
33
# File 'lib/pwork/async/manager.rb', line 30

def stop
  @running = false
  @threads = []
end

#thread_helperObject

This method is called to expose the thread helper



57
58
59
# File 'lib/pwork/async/manager.rb', line 57

def thread_helper
  PWork::Helpers::Threads
end