Class: Fluent::StatsitePlugin::ChildProcess

Inherits:
Object
  • Object
show all
Defined in:
lib/fluent/plugin/statsite/child_process.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(parser, respawns = 0, log = $log) ⇒ ChildProcess

Returns a new instance of ChildProcess.



6
7
8
9
10
11
12
13
14
# File 'lib/fluent/plugin/statsite/child_process.rb', line 6

def initialize(parser, respawns=0, log = $log)
  @pid = nil
  @thread = nil
  @parser = parser
  @respawns = respawns
  @mutex = Mutex.new
  @finished = nil
  @log = log
end

Instance Attribute Details

#finishedObject

Returns the value of attribute finished.



4
5
6
# File 'lib/fluent/plugin/statsite/child_process.rb', line 4

def finished
  @finished
end

Instance Method Details

#kill_child(join_wait) ⇒ Object



27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# File 'lib/fluent/plugin/statsite/child_process.rb', line 27

def kill_child(join_wait)
  begin
    Process.kill(:TERM, @pid)
  rescue #Errno::ECHILD, Errno::ESRCH, Errno::EPERM
    # Errno::ESRCH 'No such process', ignore
    # child process killed by signal chained from fluentd process
  end
  if @thread.join(join_wait)
    # @thread successfully shutdown
    return
  end
  begin
    Process.kill(:KILL, @pid)
  rescue #Errno::ECHILD, Errno::ESRCH, Errno::EPERM
    # ignore if successfully killed by :TERM
  end
  @thread.join
end

#runObject



85
86
87
88
89
90
91
92
93
94
95
96
97
98
# File 'lib/fluent/plugin/statsite/child_process.rb', line 85

def run
  @parser.call(@io)
rescue
  @log.error "statsite thread unexpectedly failed with an error.", :command=>@command, :error=>$!.to_s
  @log.warn_backtrace $!.backtrace
ensure
  pid, stat = Process.waitpid2(@pid)
  unless @finished
    @log.error "statsite process unexpectedly exited.", :command=>@command, :ecode=>stat.to_i
    unless @respawns == 0
      @log.warn "statsite child process will respawn for next input data (respawns #{@respawns})."
    end
  end
end

#shutdownObject



46
47
48
49
50
51
# File 'lib/fluent/plugin/statsite/child_process.rb', line 46

def shutdown
  @finished = true
  @mutex.synchronize do
    kill_child(60) # TODO wait time
  end
end

#start(command) ⇒ Object



16
17
18
19
20
21
22
23
24
25
# File 'lib/fluent/plugin/statsite/child_process.rb', line 16

def start(command)
  @command = command
  @mutex.synchronize do
    @io = IO.popen(command, "r+")
    @pid = @io.pid
    @io.sync = true
    @thread = Thread.new(&method(:run))
  end
  @finished = false
end

#try_respawnObject



67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
# File 'lib/fluent/plugin/statsite/child_process.rb', line 67

def try_respawn
  return false if @respawns == 0
  @mutex.synchronize do
    return false if @respawns == 0

    kill_child(5) # TODO wait time

    @io = IO.popen(@command, "r+")
    @pid = @io.pid
    @io.sync = true
    @thread = Thread.new(&method(:run))

    @respawns -= 1 if @respawns > 0
  end
  @log.warn "statsite child process successfully respawned.", :command => @command, :respawns => @respawns
  true
end

#write(chunk) ⇒ Object



53
54
55
56
57
58
59
60
61
62
63
64
65
# File 'lib/fluent/plugin/statsite/child_process.rb', line 53

def write(chunk)
  begin
    chunk.write_to(@io)
  rescue Errno::EPIPE => e
    # Broken pipe (child process unexpectedly exited)
    @log.warn "statsite Broken pipe, child process maybe exited.", :command => @command
    if try_respawn
      retry # retry chunk#write_to with child respawned
    else
      raise e # to retry #write with other ChildProcess instance (when num_children > 1)
    end
  end
end