Class: Dplyr::RemoteTask

Inherits:
Object
  • Object
show all
Includes:
Dply::Logger
Defined in:
lib/dplyr/remote_task.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Dply::Logger

#debug?, #logger, stderr, #stderr

Constructor Details

#initialize(host, task, id_size: nil) ⇒ RemoteTask

Returns a new instance of RemoteTask.



12
13
14
15
16
17
# File 'lib/dplyr/remote_task.rb', line 12

def initialize(host, task, id_size: nil)
  @host = host
  @task = task
  @messages = []
  @id_size = id_size || @host_info[:id].size
end

Instance Attribute Details

#exit_statusObject (readonly)

Returns the value of attribute exit_status.



10
11
12
# File 'lib/dplyr/remote_task.rb', line 10

def exit_status
  @exit_status
end

#messagesObject (readonly)

Returns the value of attribute messages.



10
11
12
# File 'lib/dplyr/remote_task.rb', line 10

def messages
  @messages
end

Instance Method Details

#addrObject



42
43
44
# File 'lib/dplyr/remote_task.rb', line 42

def addr
  @host.fetch :addr
end

#dirObject



38
39
40
# File 'lib/dplyr/remote_task.rb', line 38

def dir
  @host.fetch :dir
end

#envObject



87
88
89
# File 'lib/dplyr/remote_task.rb', line 87

def env
  @env ||= "PATH=/usr/sbin:/usr/local/sbin:$PATH #{roles_env}"
end

#get_exit_status(pid) ⇒ Object



71
72
73
74
# File 'lib/dplyr/remote_task.rb', line 71

def get_exit_status(pid)
  pid, status = Process.waitpid2(pid)
  return status
end

#output_templateObject



76
77
78
79
80
81
# File 'lib/dplyr/remote_task.rb', line 76

def output_template
  @output_template ||= begin
    id_template = "%-#{@id_size}s".bold.grey
    template = "#{id_template}  %s"
  end
end

#pty_read(file, id) ⇒ Object



54
55
56
57
58
59
60
61
62
63
# File 'lib/dplyr/remote_task.rb', line 54

def pty_read(file, id)
  file.each do |line|
    if line =~ /\Adply_msg\|/
      receive_message line
    else
      printf output_template, id, line
    end
  end
rescue EOFError,Errno::ECONNRESET, Errno::EPIPE, Errno::EIO => e
end

#receive_message(msg_str) ⇒ Object



65
66
67
68
69
# File 'lib/dplyr/remote_task.rb', line 65

def receive_message(msg_str)
  msg = msg_str.partition("|")[2].strip
  return if msg.empty?
  @messages << msg
end

#remote_cmdObject



26
27
28
29
30
31
32
# File 'lib/dplyr/remote_task.rb', line 26

def remote_cmd
  if logger.debug?
    %(#{ssh} -l #{user} #{addr} '#{env} drake --remote --debug -d #{dir} #{@task} 2>&1')
  else
    %(#{ssh} -l #{user} #{addr} '#{env} drake --remote -d #{dir} #{@task} 2>&1' 2>/dev/null)
  end
end

#reset!Object



83
84
85
# File 'lib/dplyr/remote_task.rb', line 83

def reset!
  @output_template = nil
end

#rolesObject



46
47
48
# File 'lib/dplyr/remote_task.rb', line 46

def roles
  @host.fetch :roles
end

#roles_envObject



91
92
93
94
95
# File 'lib/dplyr/remote_task.rb', line 91

def roles_env
  return "" if not roles.size > 0
  roles_str = roles.join(",")
  "DPLY_ROLES=#{roles_str} DPLY_PERSIST_ROLES=1"
end

#runObject



19
20
21
22
23
24
# File 'lib/dplyr/remote_task.rb', line 19

def run
  reset!
  r, w, pid = PTY.spawn(remote_cmd)
  pty_read(r, @host[:id])
  @exit_status = get_exit_status pid
end

#sshObject



50
51
52
# File 'lib/dplyr/remote_task.rb', line 50

def ssh
  "ssh -tt -o BatchMode=yes -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null"
end

#userObject



34
35
36
# File 'lib/dplyr/remote_task.rb', line 34

def user
  @host.fetch :user
end