Class: RExec::Connection

Inherits:
Object
  • Object
show all
Defined in:
lib/rexec/connection.rb

Overview

This class represents an abstract connection to another ruby process. The interface does not impose any structure on the way this communication link works, except for the fact you can send and receive objects. You can implement whatever kind of idiom you need for communication on top of this library.

Depending on how you set things up, this can connect to a local ruby process, or a remote ruby process via SSH (for example).

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(input, output, error = nil) ⇒ Connection

Create a new connection. You need to supply a pipe for reading input, a pipe for sending output, and optionally a pipe for errors to be read from.



48
49
50
51
52
53
54
55
56
57
# File 'lib/rexec/connection.rb', line 48

def initialize(input, output, error = nil)
  @input = input
  @output = output
  @running = true

  @error = error

  @receive_mutex = Mutex.new
  @send_mutex = Mutex.new
end

Class Method Details

.build(process, options) {|cin| ... } ⇒ Object

Yields:

  • (cin)

Raises:



31
32
33
34
35
36
37
38
39
40
41
42
43
44
# File 'lib/rexec/connection.rb', line 31

def self.build(process, options, &block)
  cin = process.input
  cout = process.output
  cerr = process.error
  
  # We require both cin and cout to be connected in order for connection to work
  raise InvalidConnectionError.new("Input (#{cin}) or Output (#{cout}) is not connected!") unless cin and cout
  
  yield cin
  
  cin.puts("\004")
  
  return self.new(cout, cin, cerr)
end

Instance Method Details

#dump_errors(to = $stderr) ⇒ Object

Dump any text which has been written to $stderr in the child process.



109
110
111
112
113
114
115
116
117
118
119
# File 'lib/rexec/connection.rb', line 109

def dump_errors(to = $stderr)
  if @error and !@error.closed?
    while true
      result = IO.select([@error], [], [], 0)

      break if result == nil

      to.puts @error.readline.chomp
    end
  end
end

#errorObject

The pipe used for receiving errors. On the client side this pipe is writable, on the server side this pipe is readable. You should avoid using it on the client side and simply use $stderr.



71
72
73
# File 'lib/rexec/connection.rb', line 71

def error
  @error
end

#inputObject

The pipe used for reading data



60
61
62
# File 'lib/rexec/connection.rb', line 60

def input
  @input
end

#outputObject

The pipe used for writing data



65
66
67
# File 'lib/rexec/connection.rb', line 65

def output
  @output
end

#receive_objectObject

Receive an object from the connection. This function is thread-safe. This function may block.



122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
# File 'lib/rexec/connection.rb', line 122

def receive_object
  object = nil

  @receive_mutex.synchronize do
    begin
      object = Marshal.load(@input)
    rescue EOFError
      object = nil
      @running = false
    end
  end
  
  if object and object.kind_of?(Exception)
    raise object
  end
  
  return object
end

#run(&block) ⇒ Object

This is a very simple runloop. It provides an object when it is received.



87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
# File 'lib/rexec/connection.rb', line 87

def run(&block)
  while @running
    pipes = IO.select([@input])

    if pipes[0].size > 0
      object = receive_object
      
      if object == nil
        @running = false
        return
      end
      
      begin
        yield object
      rescue Exception => ex
        send_object(ex)
      end
    end
  end
end

#running?Boolean

Return whether or not the connection is running.

Returns:

  • (Boolean)


82
83
84
# File 'lib/rexec/connection.rb', line 82

def running?
  @running
end

#send_object(*objects) ⇒ Object

Send object(s). This function is thread-safe.



142
143
144
145
146
147
148
149
150
151
# File 'lib/rexec/connection.rb', line 142

def send_object(*objects)
  @send_mutex.synchronize do
    objects.each do |o|
      data = Marshal.dump(o)
      @output.write(data)
    end
    
    @output.flush
  end
end

#stopObject

Stop the connection, and close the output pipe.



76
77
78
79
# File 'lib/rexec/connection.rb', line 76

def stop
  @running = false
  @output.close
end