Class: PerfectQueue::Multiprocess::ThreadProcessor

Inherits:
Object
  • Object
show all
Defined in:
lib/perfectqueue/multiprocess/thread_processor.rb

Direct Known Subclasses

ChildProcess

Instance Method Summary collapse

Constructor Details

#initialize(runner, processor_id, config) ⇒ ThreadProcessor

Returns a new instance of ThreadProcessor.



23
24
25
26
27
28
29
30
31
32
33
# File 'lib/perfectqueue/multiprocess/thread_processor.rb', line 23

def initialize(runner, processor_id, config)
  @runner = runner
  @processor_id = processor_id

  @running_flag = BlockingFlag.new
  @finish_flag = BlockingFlag.new

  @tm = TaskMonitor.new(config, method(:child_heartbeat), method(:force_stop))

  restart(false, config)
end

Instance Method Details

#force_stopObject



76
77
78
79
80
# File 'lib/perfectqueue/multiprocess/thread_processor.rb', line 76

def force_stop
  @log.error "Force stopping processor processor_id=#{@processor_id}"
  @tm.stop_task(true)
  @finish_flag.set!
end

#joinObject



45
46
47
48
49
# File 'lib/perfectqueue/multiprocess/thread_processor.rb', line 45

def join
  while t = @thread
    t.join
  end
end

#keepaliveObject



51
52
53
54
55
# File 'lib/perfectqueue/multiprocess/thread_processor.rb', line 51

def keepalive
  unless @thread
    @thread = Thread.new(&method(:run))
  end
end

#logrotatedObject



82
83
84
# File 'lib/perfectqueue/multiprocess/thread_processor.rb', line 82

def logrotated
  # do nothing
end

#restart(immediate, config) ⇒ Object



57
58
59
60
61
62
63
64
65
66
67
68
# File 'lib/perfectqueue/multiprocess/thread_processor.rb', line 57

def restart(immediate, config)
  @poll_interval = config[:poll_interval] || 1.0
  @log = config[:logger]
  @task_prefetch = config[:task_prefetch] || 0
  @config = config

  @tm.stop_task(immediate)

  @finish_flag.set_region do
    @running_flag.wait while @running_flag.set?
  end
end

#runObject



35
36
37
38
39
40
41
42
43
# File 'lib/perfectqueue/multiprocess/thread_processor.rb', line 35

def run
  @tm.start
  @running_flag.set_region do
    run_loop
  end
  @tm.join
ensure
  @thread = nil
end

#stop(immediate) ⇒ Object



70
71
72
73
74
# File 'lib/perfectqueue/multiprocess/thread_processor.rb', line 70

def stop(immediate)
  @log.info immediate ? "Stopping thread immediately id=#{@processor_id}" : "Stopping thread gracefully id=#{@processor_id}"
  @tm.stop_task(immediate)
  @finish_flag.set!
end