Class: PerfectQueue::Multiprocess::ThreadProcessor

Inherits:
Object
  • Object
show all
Defined in:
lib/perfectqueue/multiprocess/thread_processor.rb

Direct Known Subclasses

ChildProcess

Instance Method Summary collapse

Constructor Details

#initialize(runner, config) ⇒ ThreadProcessor



23
24
25
26
27
28
29
30
31
32
# File 'lib/perfectqueue/multiprocess/thread_processor.rb', line 23

def initialize(runner, config)
  @runner = runner

  @running_flag = BlockingFlag.new
  @finish_flag = BlockingFlag.new

  @tm = TaskMonitor.new(config, method(:child_heartbeat))

  restart(false, config)
end

Instance Method Details

#joinObject



46
47
48
49
50
# File 'lib/perfectqueue/multiprocess/thread_processor.rb', line 46

def join
  while t = @thread
    t.join
  end
end

#keepaliveObject



52
53
54
55
56
# File 'lib/perfectqueue/multiprocess/thread_processor.rb', line 52

def keepalive
  unless @thread
    @thread = Thread.new(&method(:run))
  end
end

#logrotatedObject



76
77
78
# File 'lib/perfectqueue/multiprocess/thread_processor.rb', line 76

def logrotated
  # do nothing
end

#restart(immediate, config) ⇒ Object



58
59
60
61
62
63
64
65
66
67
68
# File 'lib/perfectqueue/multiprocess/thread_processor.rb', line 58

def restart(immediate, config)
  @poll_interval = config[:poll_interval] || 1.0
  @log = config[:logger]
  @config = config

  @tm.stop_task(immediate)

  @finish_flag.set_region do
    @running_flag.wait while @running_flag.set?
  end
end

#runObject



34
35
36
37
38
39
40
41
42
43
44
# File 'lib/perfectqueue/multiprocess/thread_processor.rb', line 34

def run
  @tm.start
  @running_flag.set_region do
    until @finish_flag.set?
      run_loop
    end
  end
  @tm.join
ensure
  @thread = nil
end

#stop(immediate) ⇒ Object



70
71
72
73
74
# File 'lib/perfectqueue/multiprocess/thread_processor.rb', line 70

def stop(immediate)
  @log.info immediate ? "Stopping worker thread immediately" : "Stopping worker thread gracefully"
  @tm.stop_task(immediate)
  @finish_flag.set!
end