Class: NewRelic::Agent::PipeChannelManager::Pipe

Inherits:
Object
  • Object
show all
Defined in:
lib/new_relic/agent/pipe_channel_manager.rb

Overview

Expected initial sequence of events for Pipe usage:

  1. Pipe is created in parent process (read and write ends open)

  2. Parent process forks

  3. An after_fork hook is invoked in the child

  4. From after_fork hook, child closes read end of pipe, and writes a ready marker on the pipe (after_fork_in_child).

  5. The parent receives the ready marker, and closes the write end of the pipe in response (after_fork_in_parent).

After this sequence of steps, an exit (whether clean or not) of the child will result in the pipe being marked readable again, and giving an EOF marker (nil) when read. Note that closing of the unused ends of the pipe in the parent and child processes is essential in order for the EOF to be correctly triggered. The ready marker mechanism is used because there’s no easy hook for after_fork in the parent process.

This class provides message framing (separation of individual messages), but not serialization. Serialization / deserialization is the responsibility of clients.

Message framing works like this:

Each message sent across the pipe is preceded by a length tag that specifies the length of the message that immediately follows, in bytes. The length tags are serialized as unsigned big-endian long values, (4 bytes each). This means that the maximum theoretical message size is 4 GB - much larger than we’d ever need or want for this application.

Constant Summary collapse

READY_MARKER =
'READY'
NUM_LENGTH_BYTES =
4

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializePipe

Returns a new instance of Pipe.



64
65
66
67
68
69
70
71
# File 'lib/new_relic/agent/pipe_channel_manager.rb', line 64

def initialize
  @out, @in = IO.pipe
  if defined?(::Encoding::ASCII_8BIT)
    @in.set_encoding(::Encoding::ASCII_8BIT)
  end
  @last_read = Process.clock_gettime(Process::CLOCK_REALTIME)
  @parent_pid = $$
end

Instance Attribute Details

#inObject

Returns the value of attribute in.



61
62
63
# File 'lib/new_relic/agent/pipe_channel_manager.rb', line 61

def in
  @in
end

#last_readObject (readonly)

Returns the value of attribute last_read.



62
63
64
# File 'lib/new_relic/agent/pipe_channel_manager.rb', line 62

def last_read
  @last_read
end

#outObject

Returns the value of attribute out.



61
62
63
# File 'lib/new_relic/agent/pipe_channel_manager.rb', line 61

def out
  @out
end

#parent_pidObject (readonly)

Returns the value of attribute parent_pid.



62
63
64
# File 'lib/new_relic/agent/pipe_channel_manager.rb', line 62

def parent_pid
  @parent_pid
end

Instance Method Details

#after_fork_in_childObject



115
116
117
118
# File 'lib/new_relic/agent/pipe_channel_manager.rb', line 115

def after_fork_in_child
  @out.close unless @out.closed?
  write(READY_MARKER)
end

#after_fork_in_parentObject



120
121
122
# File 'lib/new_relic/agent/pipe_channel_manager.rb', line 120

def after_fork_in_parent
  @in.close unless @in.closed?
end

#closeObject



73
74
75
76
# File 'lib/new_relic/agent/pipe_channel_manager.rb', line 73

def close
  @out.close unless @out.closed?
  @in.close unless @in.closed?
end

#closed?Boolean

Returns:

  • (Boolean)


124
125
126
# File 'lib/new_relic/agent/pipe_channel_manager.rb', line 124

def closed?
  @out.closed? && @in.closed?
end

#deserialize_message_length(data) ⇒ Object



82
83
84
# File 'lib/new_relic/agent/pipe_channel_manager.rb', line 82

def deserialize_message_length(data)
  data.unpack('L>').first
end

#eof?Boolean

Returns:

  • (Boolean)


111
112
113
# File 'lib/new_relic/agent/pipe_channel_manager.rb', line 111

def eof?
  !@out.closed? && @out.eof?
end

#readObject



92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
# File 'lib/new_relic/agent/pipe_channel_manager.rb', line 92

def read
  @in.close unless @in.closed?
  @last_read = Process.clock_gettime(Process::CLOCK_REALTIME)
  length_bytes = @out.read(NUM_LENGTH_BYTES)
  if length_bytes
    message_length = deserialize_message_length(length_bytes)
    if message_length
      @out.read(message_length)
    else
      length_hex = length_bytes.bytes.map { |b| b.to_s(16) }.join(' ')
      NewRelic::Agent.logger.error("Failed to deserialize message length from pipe. Bytes: [#{length_hex}]")
      nil
    end
  else
    NewRelic::Agent.logger.error('Failed to read bytes for length from pipe.')
    nil
  end
end

#serialize_message_length(data) ⇒ Object



78
79
80
# File 'lib/new_relic/agent/pipe_channel_manager.rb', line 78

def serialize_message_length(data)
  [data.bytesize].pack('L>')
end

#write(data) ⇒ Object



86
87
88
89
90
# File 'lib/new_relic/agent/pipe_channel_manager.rb', line 86

def write(data)
  @out.close unless @out.closed?
  @in << serialize_message_length(data)
  @in << data
end