Class: WorkQueue::Worker

Inherits:
Object
  • Object
show all
Defined in:
lib/scout/work_queue/worker.rb

Constant Summary collapse

EXIT_STATUS =
246
SIGNAL =
'ABRT'

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(ignore_ouput = false) ⇒ Worker

Returns a new instance of Worker.



7
8
9
# File 'lib/scout/work_queue/worker.rb', line 7

def initialize(ignore_ouput = false)
  @ignore_output = ignore_ouput
end

Instance Attribute Details

#ignore_ouputObject

Returns the value of attribute ignore_ouput.



6
7
8
# File 'lib/scout/work_queue/worker.rb', line 6

def ignore_ouput
  @ignore_ouput
end

#pidObject

Returns the value of attribute pid.



6
7
8
# File 'lib/scout/work_queue/worker.rb', line 6

def pid
  @pid
end

#queue_idObject

Returns the value of attribute queue_id.



6
7
8
# File 'lib/scout/work_queue/worker.rb', line 6

def queue_id
  @queue_id
end

Class Method Details

.join(workers) ⇒ Object



77
78
79
80
81
82
83
84
85
86
# File 'lib/scout/work_queue/worker.rb', line 77

def self.join(workers)
  workers = [workers] unless Array === workers
  begin
    while pid = Process.wait 
      status = $?
        worker = workers.select{|w| w.pid == pid }.first
    end
  rescue Errno::ECHILD
  end
end

Instance Method Details

#abortObject



63
64
65
66
67
68
69
70
# File 'lib/scout/work_queue/worker.rb', line 63

def abort
  begin
    Log.medium "Aborting worker #{worker_id}"
    Process.kill SIGNAL, @pid
  rescue Errno::ECHILD 
  rescue Errno::ESRCH
  end
end

#joinObject



72
73
74
75
# File 'lib/scout/work_queue/worker.rb', line 72

def join
  Log.low "Joining worker #{worker_id}"
  Process.waitpid @pid
end

#process(input, output = nil, &block) ⇒ Object



32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
# File 'lib/scout/work_queue/worker.rb', line 32

def process(input, output = nil, &block)
  run do
    begin
      if output
        Open.purge_pipes(input.sread, output.swrite)
      else
        Open.purge_pipes(input.sread)
      end

      while obj = input.read
        if DoneProcessing === obj
          output.write DoneProcessing.new
          raise obj 
        end
        res = block.call obj
        output.write res unless ignore_ouput || res == :ignore 
      end
    rescue DoneProcessing
    rescue Interrupt
    rescue Exception
      begin
        output.write WorkerException.new($!, Process.pid)
        exit EXIT_STATUS
      rescue
        exit -1
      end
    end
    exit 0
  end
end

#runObject



19
20
21
22
23
24
25
26
27
28
29
30
# File 'lib/scout/work_queue/worker.rb', line 19

def run
  @pid = Process.fork do
    Signal.trap(SIGNAL) do
      Kernel.exit! EXIT_STATUS
    end
    Signal.trap('INT') do
      Kernel.exit! -1
    end
    Log.low "Worker start #{worker_id}"
    yield
  end
end

#worker_idObject



15
16
17
# File 'lib/scout/work_queue/worker.rb', line 15

def worker_id
  [worker_short_id, queue_id] * "->"
end

#worker_short_idObject



11
12
13
# File 'lib/scout/work_queue/worker.rb', line 11

def worker_short_id
  [object_id, pid].compact * "@"
end