Class: ProcessExecuter::MonitoredPipe
- Inherits:
-
Object
- Object
- ProcessExecuter::MonitoredPipe
- Includes:
- TrackOpenInstances
- Defined in:
- lib/process_executer/monitored_pipe.rb
Overview
Write data sent through a pipe to a destination
When a new MonitoredPipe is created, a pipe is created (via IO.pipe) and a thread is created to read data written to the pipe.
If the destination raises an exception, the monitoring thread will exit, the
pipe will be closed, and the exception will be saved in #exception
.
#close
must be called to ensure that (1) the pipe is closed, (2) all data is
read from the pipe and written to the destination, and (3) the monitoring thread is
killed.
Instance Attribute Summary collapse
-
#chunk_size ⇒ Integer
readonly
The size of the chunks to read from the pipe.
-
#destination ⇒ Array<ProcessExecuter::Destination::Base>
readonly
The redirection destination to write data that is read from the pipe.
-
#exception ⇒ Exception?
readonly
The exception raised by a destination.
-
#fileno ⇒ Integer
readonly
private
The file descriptor for the write end of the pipe.
-
#pipe_reader ⇒ IO
readonly
The read end of the pipe.
-
#pipe_writer ⇒ IO
readonly
The write end of the pipe.
-
#state ⇒ Symbol
readonly
The state of the pipe.
-
#thread ⇒ Thread
readonly
The thread that monitors the pipe.
Instance Method Summary collapse
-
#close
Set the state to
:closing
and wait for the state to be set to:closed
. -
#initialize(redirection_destination, chunk_size: 100_000) ⇒ MonitoredPipe
constructor
Create a new monitored pipe.
-
#to_io ⇒ IO
private
Return the write end of the pipe so that data can be written to it.
-
#write(data) ⇒ Integer
private
Writes data to the pipe so that it can be read by the monitor thread.
Constructor Details
#initialize(redirection_destination, chunk_size: 100_000) ⇒ MonitoredPipe
Create a new monitored pipe
Creates a IO.pipe and starts a monitoring thread to read data written to the pipe.
59 60 61 62 63 64 65 66 67 68 69 70 71 72 |
# File 'lib/process_executer/monitored_pipe.rb', line 59 def initialize(redirection_destination, chunk_size: 100_000) @destination = Destinations.factory(redirection_destination) assert_destination_is_compatible_with_monitored_pipe @mutex = Mutex.new @condition_variable = ConditionVariable.new @chunk_size = chunk_size @pipe_reader, @pipe_writer = IO.pipe @state = :open @thread = start_monitoring_thread self.class.add_open_instance(self) end |
Instance Attribute Details
#chunk_size ⇒ Integer (readonly)
The size of the chunks to read from the pipe
181 182 183 |
# File 'lib/process_executer/monitored_pipe.rb', line 181 def chunk_size @chunk_size end |
#destination ⇒ Array<ProcessExecuter::Destination::Base> (readonly)
The redirection destination to write data that is read from the pipe
195 196 197 |
# File 'lib/process_executer/monitored_pipe.rb', line 195 def destination @destination end |
#exception ⇒ Exception? (readonly)
The exception raised by a destination
If an exception is raised by a destination, it is stored here. Otherwise, it is nil
.
263 264 265 |
# File 'lib/process_executer/monitored_pipe.rb', line 263 def exception @exception end |
#fileno ⇒ Integer (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
The file descriptor for the write end of the pipe
139 140 141 |
# File 'lib/process_executer/monitored_pipe.rb', line 139 def fileno pipe_writer.fileno end |
#pipe_reader ⇒ IO (readonly)
The read end of the pipe
219 220 221 |
# File 'lib/process_executer/monitored_pipe.rb', line 219 def pipe_reader @pipe_reader end |
#pipe_writer ⇒ IO (readonly)
The write end of the pipe
230 231 232 |
# File 'lib/process_executer/monitored_pipe.rb', line 230 def pipe_writer @pipe_writer end |
#state ⇒ Symbol (readonly)
The state of the pipe
Must be either :open
, :closing
, or :closed
:open
- the pipe is open and data can be written to it:closing
- the pipe is being closed and data can no longer be written to it:closed
- the pipe is closed and data can no longer be written to it
250 251 252 |
# File 'lib/process_executer/monitored_pipe.rb', line 250 def state @state end |
#thread ⇒ Thread (readonly)
The thread that monitors the pipe
208 209 210 |
# File 'lib/process_executer/monitored_pipe.rb', line 208 def thread @thread end |
Instance Method Details
#close
This method returns an undefined value.
Set the state to :closing
and wait for the state to be set to :closed
The monitoring thread will see that the state has changed and will close the pipe.
89 90 91 92 93 94 95 96 97 98 99 100 |
# File 'lib/process_executer/monitored_pipe.rb', line 89 def close mutex.synchronize do if state == :open @state = :closing condition_variable.wait(mutex) while @state != :closed end end thread.join destination.close self.class.remove_open_instance(self) end |
#to_io ⇒ IO
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Return the write end of the pipe so that data can be written to it
Data written to this end of the pipe will be read by the monitor thread and written to the destination.
This is so we can provide a MonitoredPipe to Process.spawn as a FD
121 122 123 |
# File 'lib/process_executer/monitored_pipe.rb', line 121 def to_io pipe_writer end |
#write(data) ⇒ Integer
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Writes data to the pipe so that it can be read by the monitor thread
Primarily used for testing.
161 162 163 164 165 166 167 |
# File 'lib/process_executer/monitored_pipe.rb', line 161 def write(data) mutex.synchronize do raise IOError, 'closed stream' unless state == :open pipe_writer.write(data) end end |