Class: ZkExec::Executor
- Inherits:
-
Object
- Object
- ZkExec::Executor
- Includes:
- Process, ZkExec
- Defined in:
- lib/zkexec/executor.rb
Constant Summary
Constants included from ZkExec
Instance Method Summary collapse
- #execute ⇒ Object
-
#initialize(options) ⇒ Executor
constructor
A new instance of Executor.
- #run ⇒ Object
Methods included from ZkExec
Constructor Details
#initialize(options) ⇒ Executor
9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 |
# File 'lib/zkexec/executor.rb', line 9 def initialize() @cmd = [:exec] @cluster = [:cluster] @health = [:health] @health_interval = [:health_interval] @health_delay = [:health_delay] @mirrors = [:mirrors] @alert = [:alert] @lock_name = [:lock] log "connecting to #{@cluster}" @zk = ZK.new(@cluster, :thread => :per_callback) raise "timeout connecting to #{@cluster}" unless @zk.connected? log "connected" # re-establish watches @on_connected ||= @zk.on_connected do @mirrors.each do |(local, remote)| watch(remote) end end @restart_lock = @lock_name && @zk.exclusive_locker(@lock_name) @local_lock = Mutex.new @mirrors.each do |(local, remote)| log "registering callback on #{remote}" @zk.register(remote) do |event| if event.changed? log "#{remote} changed" copy(local, remote) kill_to_refork else watch(remote) end end watch(remote) end end |
Instance Method Details
#execute ⇒ Object
164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 |
# File 'lib/zkexec/executor.rb', line 164 def execute @should_refork = true while @should_refork start_health_thread log "forking #{@cmd}" @child = fork { exec @cmd } log "forked #{@child}" @should_refork = false wait @child end if $?.exitstatus != 0 alert raise "command failed" end end |
#run ⇒ Object
160 161 162 |
# File 'lib/zkexec/executor.rb', line 160 def run Thread.new { execute } end |