Class: Narou::Worker

Inherits:
Object
  • Object
show all
Extended by:
Forwardable
Includes:
Mixin::OutputError, Singleton
Defined in:
lib/worker.rb

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Mixin::OutputError

#output_error

Constructor Details

#initializeWorker

Returns a new instance of Worker.



28
29
30
31
32
33
34
35
# File 'lib/worker.rb', line 28

def initialize
  self.queue = Queue.new
  self.mutex = Mutex.new
  self.size = 0
  self.worker_thread = nil
  self.cancel_signal = false
  self.thread_of_block_executing = nil
end

Instance Attribute Details

#cancel_signalObject

Returns the value of attribute cancel_signal.



15
16
17
# File 'lib/worker.rb', line 15

def cancel_signal
  @cancel_signal
end

#mutexObject

Returns the value of attribute mutex.



15
16
17
# File 'lib/worker.rb', line 15

def mutex
  @mutex
end

#push_serverObject

Returns the value of attribute push_server.



15
16
17
# File 'lib/worker.rb', line 15

def push_server
  @push_server
end

#queueObject

Returns the value of attribute queue.



15
16
17
# File 'lib/worker.rb', line 15

def queue
  @queue
end

#sizeObject

Returns the value of attribute size.



15
16
17
# File 'lib/worker.rb', line 15

def size
  @size
end

#thread_of_block_executingObject

Returns the value of attribute thread_of_block_executing.



15
16
17
# File 'lib/worker.rb', line 15

def thread_of_block_executing
  @thread_of_block_executing
end

#worker_threadObject

Returns the value of attribute worker_thread.



15
16
17
# File 'lib/worker.rb', line 15

def worker_thread
  @worker_thread
end

Class Method Details

.runObject



23
24
25
# File 'lib/worker.rb', line 23

def run
  instance.start
end

Instance Method Details

#cancelObject



81
82
83
84
85
86
87
88
89
90
91
92
# File 'lib/worker.rb', line 81

def cancel
  mutex.synchronize do
    if size > 0
      self.cancel_signal = true
      self.size = 0
      thread_of_block_executing&.raise(Interrupt)
      self.thread_of_block_executing = nil
      queue.clear
    end
  end
  Thread.pass
end

#canceled?Boolean

Returns:

  • (Boolean)


94
95
96
# File 'lib/worker.rb', line 94

def canceled?
  cancel_signal
end

#countdownObject



120
121
122
123
124
125
126
# File 'lib/worker.rb', line 120

def countdown
  mutex.synchronize do
    self.size -= 1
    self.size = 0 if size < 0
    notification_queue
  end
end

#countupObject



113
114
115
116
117
118
# File 'lib/worker.rb', line 113

def countup
  mutex.synchronize do
    self.size += 1
    notification_queue
  end
end

#joinObject



67
68
69
70
71
72
73
74
75
76
77
78
79
# File 'lib/worker.rb', line 67

def join
  until size.zero?
    unless thread_of_block_executing
      Thread.pass
      next
    end
    thread_of_block_executing&.join
  end
rescue Interrupt
  thread_of_block_executing&.raise(Interrupt)
  self.thread_of_block_executing = nil
  sleep 0.1
end

#notification_queueObject



109
110
111
# File 'lib/worker.rb', line 109

def notification_queue
  push_server&.send_all("notification.queue" => [Narou::WebWorker.instance.size, size])
end

#push(&block) ⇒ Object



104
105
106
107
# File 'lib/worker.rb', line 104

def push(&block)
  countup
  queue.push(block: block)
end

#running?Boolean

Returns:

  • (Boolean)


37
38
39
# File 'lib/worker.rb', line 37

def running?
  worker_thread.present?
end

#startObject



41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
# File 'lib/worker.rb', line 41

def start
  return if running?
  self.worker_thread = Thread.new do
    loop do
      begin
        q = queue.pop
        self.cancel_signal = false
        self.thread_of_block_executing = Thread.new do
          q[:block].call
        end
        thread_of_block_executing.join
        self.thread_of_block_executing = nil
      rescue SystemExit
      rescue Interrupt
        thread_of_block_executing&.raise(Interrupt)
        self.thread_of_block_executing = nil
        sleep 0.1
      rescue Exception => e
        output_error($stdout2, e)
      ensure
        countdown
      end
    end
  end
end

#stopObject



98
99
100
101
102
# File 'lib/worker.rb', line 98

def stop
  cancel
  worker_thread&.kill
  self.worker_thread = nil
end