Class: ZkExec::Executor

Inherits:
Object
  • Object
show all
Includes:
Process, ZkExec
Defined in:
lib/zkexec/executor.rb

Constant Summary

Constants included from ZkExec

VERSION

Instance Method Summary collapse

Methods included from ZkExec

#log, #silence!

Constructor Details

#initialize(options) ⇒ Executor



9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# File 'lib/zkexec/executor.rb', line 9

def initialize(options)
  @cmd             = options[:exec]
  @cluster         = options[:cluster]
  @health          = options[:health]
  @health_interval = options[:health_interval]
  @health_delay    = options[:health_delay]
  @mirrors         = options[:mirrors]
  @alert           = options[:alert]
  @lock_name       = options[:lock]
  
  log "connecting to #{@cluster}"
  @zk = ZK.new(@cluster, :thread => :per_callback)
  raise "timeout connecting to #{@cluster}" unless @zk.connected?
  log "connected"
  
  # re-establish watches
  @on_connected ||= @zk.on_connected do
    @mirrors.each do |(local, remote)|
      watch(remote)
    end
  end
  
  @restart_lock = @lock_name && @zk.exclusive_locker(@lock_name)
  @local_lock = Mutex.new
  
  @mirrors.each do |(local, remote)|
    log "registering callback on #{remote}"
    @zk.register(remote) do |event|
      if event.changed?
        log "#{remote} changed"
        copy(local, remote)  
        kill_to_refork
      else
        watch(remote)  
      end
    end
    watch(remote)
  end
end

Instance Method Details

#executeObject



164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
# File 'lib/zkexec/executor.rb', line 164

def execute
  @should_refork = true
  
  while @should_refork
    start_health_thread
    log "forking #{@cmd}"
    @child = fork { exec @cmd }
    log "forked #{@child}"
    @should_refork = false
    wait @child
  end

  if $?.exitstatus != 0
    alert 
    raise "command failed"
  end
end

#runObject



160
161
162
# File 'lib/zkexec/executor.rb', line 160

def run
  Thread.new { execute }
end