Class: Evrone::Common::AMQP::Supervisor::Threaded

Inherits:
Object
  • Object
show all
Includes:
Logger
Defined in:
lib/evrone/common/amqp/supervisor/threaded.rb

Defined Under Namespace

Classes: SpawnAttemptsLimitReached, Task

Constant Summary collapse

POOL_INTERVAL =
0.5
@@shutdown =
false

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeThreaded

Returns a new instance of Threaded.



61
62
63
64
# File 'lib/evrone/common/amqp/supervisor/threaded.rb', line 61

def initialize
  self.class.resume
  @tasks = Array.new
end

Class Method Details

.build(tasks) ⇒ Object



37
38
39
40
41
42
43
44
45
# File 'lib/evrone/common/amqp/supervisor/threaded.rb', line 37

def build(tasks)
  supervisor = new
  tasks.each_pair do |k,v|
    v.times do |n|
      supervisor.add k, :subscribe, n
    end
  end
  supervisor
end

.resumeObject



47
48
49
# File 'lib/evrone/common/amqp/supervisor/threaded.rb', line 47

def resume
  @@shutdown = false
end

.shutdownObject



55
56
57
# File 'lib/evrone/common/amqp/supervisor/threaded.rb', line 55

def shutdown
  @@shutdown = true
end

.shutdown?Boolean

Returns:

  • (Boolean)


51
52
53
# File 'lib/evrone/common/amqp/supervisor/threaded.rb', line 51

def shutdown?
  @@shutdown
end

Instance Method Details

#add(object, method, id) ⇒ Object



66
67
68
# File 'lib/evrone/common/amqp/supervisor/threaded.rb', line 66

def add(object, method, id)
  @tasks.push Task.new(object, method, id).freeze
end

#runObject



86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
# File 'lib/evrone/common/amqp/supervisor/threaded.rb', line 86

def run
  start_all_threads

  loop do
    task = @tasks.shift
    break unless task

    case
    when shutdown?
      log_thread_error task
    when task.alive?
      @tasks.push task
    else
      process_fail task
    end

    sleep POOL_INTERVAL unless shutdown?
  end
end

#run_asyncObject



82
83
84
# File 'lib/evrone/common/amqp/supervisor/threaded.rb', line 82

def run_async
  Thread.new { run }.tap{|t| t.abort_on_exception = true }
end

#shutdownObject



78
79
80
# File 'lib/evrone/common/amqp/supervisor/threaded.rb', line 78

def shutdown
  self.class.shutdown
end

#shutdown?Boolean

Returns:

  • (Boolean)


74
75
76
# File 'lib/evrone/common/amqp/supervisor/threaded.rb', line 74

def shutdown?
  self.class.shutdown?
end

#sizeObject



70
71
72
# File 'lib/evrone/common/amqp/supervisor/threaded.rb', line 70

def size
  @tasks.size
end