Class: ProcessExecuter::MonitoredPipe

Inherits:
Object
  • Object
show all
Includes:
TrackOpenInstances
Defined in:
lib/process_executer/monitored_pipe.rb

Overview

Write data sent through a pipe to a destination

When a new MonitoredPipe is created, a pipe is created (via IO.pipe) and a thread is created to read data written to the pipe.

If the destination raises an exception, the monitoring thread will exit, the pipe will be closed, and the exception will be saved in #exception.

#close must be called to ensure that (1) the pipe is closed, (2) all data is read from the pipe and written to the destination, and (3) the monitoring thread is killed.

Examples:

Collect pipe data into a string

pipe_data = StringIO.new
begin
  pipe = MonitoredPipe.new(pipe_data)
  pipe.write("Hello World")
ensure
  pipe.close
end
pipe_data.string #=> "Hello World"

Collect pipe data into a string AND a file

pipe_data_string = StringIO.new
pipe_data_file = File.open("pipe_data.txt", "w")
begin
  pipe = MonitoredPipe.new(pipe_data_string, pipe_data_file)
  pipe.write("Hello World")
ensure
  pipe.close
end
pipe_data_string.string #=> "Hello World"
File.read("pipe_data.txt") #=> "Hello World"

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(redirection_destination, chunk_size: 100_000) ⇒ MonitoredPipe

Create a new monitored pipe

Creates a IO.pipe and starts a monitoring thread to read data written to the pipe.

Examples:

data_collector = StringIO.new
pipe = ProcessExecuter::MonitoredPipe.new(data_collector)

Parameters:

  • redirection_destination (Array<#write>)

    as data is read from the pipe, it is written to this destination

  • chunk_size (Integer) (defaults to: 100_000)

    the size of the chunks to read from the pipe



59
60
61
62
63
64
65
66
67
68
69
70
71
72
# File 'lib/process_executer/monitored_pipe.rb', line 59

def initialize(redirection_destination, chunk_size: 100_000)
  @destination = Destinations.factory(redirection_destination)

  assert_destination_is_compatible_with_monitored_pipe

  @mutex = Mutex.new
  @condition_variable = ConditionVariable.new
  @chunk_size = chunk_size
  @pipe_reader, @pipe_writer = IO.pipe
  @state = :open
  @thread = start_monitoring_thread

  self.class.add_open_instance(self)
end

Instance Attribute Details

#chunk_sizeInteger (readonly)

The size of the chunks to read from the pipe

Examples:

require 'stringio'
data_collector = StringIO.new
pipe = ProcessExecuter::MonitoredPipe.new(data_collector)
pipe.chunk_size #=> 1000

Returns:

  • (Integer)

    the size of the chunks to read from the pipe



181
182
183
# File 'lib/process_executer/monitored_pipe.rb', line 181

def chunk_size
  @chunk_size
end

#destinationArray<ProcessExecuter::Destination::Base> (readonly)

The redirection destination to write data that is read from the pipe

Examples:

require 'stringio'
data_collector = StringIO.new
pipe = ProcessExecuter::MonitoredPipe.new(data_collector)
pipe.destination #=>

Returns:

  • (Array<ProcessExecuter::Destination::Base>)


195
196
197
# File 'lib/process_executer/monitored_pipe.rb', line 195

def destination
  @destination
end

#exceptionException? (readonly)

The exception raised by a destination

If an exception is raised by a destination, it is stored here. Otherwise, it is nil.

Examples:

pipe.exception #=> nil

Returns:

  • (Exception, nil)

    the exception raised by a destination or nil if no exception was raised



263
264
265
# File 'lib/process_executer/monitored_pipe.rb', line 263

def exception
  @exception
end

#filenoInteger (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

The file descriptor for the write end of the pipe

Examples:

require 'stringio'
data_collector = StringIO.new
pipe = ProcessExecuter::MonitoredPipe.new(data_collector)
pipe.fileno == pipe.to_io.fileno #=> true

Returns:

  • (Integer)

    the file descriptor for the write end of the pipe



139
140
141
# File 'lib/process_executer/monitored_pipe.rb', line 139

def fileno
  pipe_writer.fileno
end

#pipe_readerIO (readonly)

The read end of the pipe

Examples:

pipe = ProcessExecuter::MonitoredPipe.new($stdout)
pipe.pipe_reader #=> #<IO:fd 11>

Returns:

  • (IO)


219
220
221
# File 'lib/process_executer/monitored_pipe.rb', line 219

def pipe_reader
  @pipe_reader
end

#pipe_writerIO (readonly)

The write end of the pipe

Examples:

pipe = ProcessExecuter::MonitoredPipe.new($stdout)
pipe.pipe_writer #=> #<IO:fd 12>

Returns:

  • (IO)

    the write end of the pipe



230
231
232
# File 'lib/process_executer/monitored_pipe.rb', line 230

def pipe_writer
  @pipe_writer
end

#stateSymbol (readonly)

The state of the pipe

Must be either :open, :closing, or :closed

  • :open - the pipe is open and data can be written to it
  • :closing - the pipe is being closed and data can no longer be written to it
  • :closed - the pipe is closed and data can no longer be written to it

Examples:

pipe = ProcessExecuter::MonitoredPipe.new($stdout)
pipe.state #=> :open
pipe.close
pipe.state #=> :closed

Returns:

  • (Symbol)

    the state of the pipe



250
251
252
# File 'lib/process_executer/monitored_pipe.rb', line 250

def state
  @state
end

#threadThread (readonly)

The thread that monitors the pipe

Examples:

require 'stringio'
data_collector = StringIO.new
pipe = ProcessExecuter::MonitoredPipe.new(data_collector)
pipe.thread #=> #<Thread:0x00007f8b1a0b0e00>

Returns:

  • (Thread)


208
209
210
# File 'lib/process_executer/monitored_pipe.rb', line 208

def thread
  @thread
end

Instance Method Details

#close

This method returns an undefined value.

Set the state to :closing and wait for the state to be set to :closed

The monitoring thread will see that the state has changed and will close the pipe.

Examples:

data_collector = StringIO.new
pipe = ProcessExecuter::MonitoredPipe.new(data_collector)
pipe.state #=> :open
pipe.write('Hello World')
pipe.close
pipe.state #=> :closed
data_collector.string #=> "Hello World"


89
90
91
92
93
94
95
96
97
98
99
100
# File 'lib/process_executer/monitored_pipe.rb', line 89

def close
  mutex.synchronize do
    if state == :open
      @state = :closing
      condition_variable.wait(mutex) while @state != :closed
    end
  end

  thread.join
  destination.close
  self.class.remove_open_instance(self)
end

#to_ioIO

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Return the write end of the pipe so that data can be written to it

Data written to this end of the pipe will be read by the monitor thread and written to the destination.

This is so we can provide a MonitoredPipe to Process.spawn as a FD

Examples:

require 'stringio'
data_collector = StringIO.new
pipe = ProcessExecuter::MonitoredPipe.new(data_collector)
pipe.to_io.write('Hello World')
pipe.close
data_collector.string #=> "Hello World"

Returns:

  • (IO)

    the write end of the pipe



121
122
123
# File 'lib/process_executer/monitored_pipe.rb', line 121

def to_io
  pipe_writer
end

#write(data) ⇒ Integer

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Writes data to the pipe so that it can be read by the monitor thread

Primarily used for testing.

Examples:

require 'stringio'
data_collector = StringIO.new
pipe = ProcessExecuter::MonitoredPipe.new(data_collector)
pipe.write('Hello World')
pipe.close
data_collector.string #=> "Hello World"

Parameters:

  • data (String)

    the data to write to the pipe

Returns:

  • (Integer)

    the number of bytes written to the pipe



161
162
163
164
165
166
167
# File 'lib/process_executer/monitored_pipe.rb', line 161

def write(data)
  mutex.synchronize do
    raise IOError, 'closed stream' unless state == :open

    pipe_writer.write(data)
  end
end