Class: Async::Container::Threaded

Inherits:
Generic
  • Object
show all
Defined in:
lib/async/container/threaded.rb

Overview

Manages a reactor within one or more threads.

Defined Under Namespace

Classes: Instance

Instance Attribute Summary

Attributes inherited from Generic

#statistics

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from Generic

#async, #failed?, #run

Constructor Details

#initializeThreaded

Returns a new instance of Threaded.



57
58
59
60
61
62
# File 'lib/async/container/threaded.rb', line 57

def initialize
  super
  
  @threads = []
  @running = true
end

Class Method Details

.multiprocess?Boolean

Returns:

  • (Boolean)


53
54
55
# File 'lib/async/container/threaded.rb', line 53

def self.multiprocess?
  false
end

.run(*args, &block) ⇒ Object



49
50
51
# File 'lib/async/container/threaded.rb', line 49

def self.run(*args, &block)
  self.new.run(*args, &block)
end

Instance Method Details

#sleep(duration) ⇒ Object



98
99
100
# File 'lib/async/container/threaded.rb', line 98

def sleep(duration)
  Kernel::sleep(duration)
end

#spawn(name: nil, restart: false, &block) ⇒ Object



64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
# File 'lib/async/container/threaded.rb', line 64

def spawn(name: nil, restart: false, &block)
  @statistics.spawn!
  
  thread = ::Thread.new do
    thread = ::Thread.current
    
    thread.name = name if name
    
    instance = Instance.new(thread)
    
    while @running
      begin
        yield instance
      rescue Exception => exception
        Async.logger.error(self) {exception}
        
        @statistics.failure!
      end
      
      if restart
        @statistics.restart!
      else
        break
      end
    end
  # rescue Interrupt
  #  # Graceful exit.
  end
  
  @threads << thread
  
  return self
end

#stop(graceful = true) ⇒ Object

Gracefully shut down all reactors.



108
109
110
111
112
113
114
115
116
117
118
119
120
# File 'lib/async/container/threaded.rb', line 108

def stop(graceful = true)
  @running = false
  
  if graceful
    @threads.each{|thread| thread.raise(Interrupt)}
  else
    @threads.each(&:kill)
  end
  
  self.wait
ensure
  @running = true
end

#waitObject



102
103
104
105
# File 'lib/async/container/threaded.rb', line 102

def wait
  @threads.each(&:join)
  @threads.clear
end