Class: PerfectQueue::Multiprocess::ThreadProcessor

Inherits:
Object
  • Object
show all
Defined in:
lib/perfectqueue/multiprocess/thread_processor.rb

Direct Known Subclasses

ChildProcess

Instance Method Summary collapse

Constructor Details

#initialize(runner, processor_id, config) ⇒ ThreadProcessor

Returns a new instance of ThreadProcessor.



23
24
25
26
27
28
29
30
31
32
33
# File 'lib/perfectqueue/multiprocess/thread_processor.rb', line 23

def initialize(runner, processor_id, config)
  @runner = runner
  @processor_id = processor_id

  @running_flag = BlockingFlag.new
  @finish_flag = BlockingFlag.new

  @tm = TaskMonitor.new(config, method(:child_heartbeat), method(:force_stop))

  restart(false, config)
end

Instance Method Details

#force_stopObject



78
79
80
81
82
# File 'lib/perfectqueue/multiprocess/thread_processor.rb', line 78

def force_stop
  @log.error "Force stopping processor processor_id=#{@processor_id}"
  @tm.stop_task(true)
  @finish_flag.set!
end

#joinObject



47
48
49
50
51
# File 'lib/perfectqueue/multiprocess/thread_processor.rb', line 47

def join
  while t = @thread
    t.join
  end
end

#keepaliveObject



53
54
55
56
57
# File 'lib/perfectqueue/multiprocess/thread_processor.rb', line 53

def keepalive
  unless @thread
    @thread = Thread.new(&method(:run))
  end
end

#logrotatedObject



84
85
86
# File 'lib/perfectqueue/multiprocess/thread_processor.rb', line 84

def logrotated
  # do nothing
end

#restart(immediate, config) ⇒ Object



59
60
61
62
63
64
65
66
67
68
69
70
# File 'lib/perfectqueue/multiprocess/thread_processor.rb', line 59

def restart(immediate, config)
  @poll_interval = config[:poll_interval] || 1.0
  @log = config[:logger]
  @task_prefetch = config[:task_prefetch] || 0
  @config = config

  @tm.stop_task(immediate)

  @finish_flag.set_region do
    @running_flag.wait while @running_flag.set?
  end
end

#runObject



35
36
37
38
39
40
41
42
43
44
45
# File 'lib/perfectqueue/multiprocess/thread_processor.rb', line 35

def run
  @tm.start
  @running_flag.set_region do
    until @finish_flag.set?
      run_loop
    end
  end
  @tm.join
ensure
  @thread = nil
end

#stop(immediate) ⇒ Object



72
73
74
75
76
# File 'lib/perfectqueue/multiprocess/thread_processor.rb', line 72

def stop(immediate)
  @log.info immediate ? "Stopping thread immediately id=#{@processor_id}" : "Stopping thread gracefully id=#{@processor_id}"
  @tm.stop_task(immediate)
  @finish_flag.set!
end