Class: NewRelic::Agent::PipeChannelManager::Listener

Inherits:
Object
  • Object
show all
Defined in:
lib/new_relic/agent/pipe_channel_manager.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeListener

Returns a new instance of Listener.



140
141
142
143
144
145
146
147
# File 'lib/new_relic/agent/pipe_channel_manager.rb', line 140

def initialize
  @started = nil
  @pipes = {}
  @pipes_lock = Mutex.new

  @timeout = 360
  @select_timeout = 60
end

Instance Attribute Details

#pipesObject

This attr_accessor intentionally provides unsynchronized access to the within the Resque child process, and must be unsynchronized in order to avoid a potential deadlock in which the PipeChannelManager::Listener thread in the parent process is holding the @pipes_lock at the time of the fork.



138
139
140
# File 'lib/new_relic/agent/pipe_channel_manager.rb', line 138

def pipes
  @pipes
end

#select_timeoutObject

This attr_accessor intentionally provides unsynchronized access to the within the Resque child process, and must be unsynchronized in order to avoid a potential deadlock in which the PipeChannelManager::Listener thread in the parent process is holding the @pipes_lock at the time of the fork.



138
139
140
# File 'lib/new_relic/agent/pipe_channel_manager.rb', line 138

def select_timeout
  @select_timeout
end

#threadObject (readonly)

Returns the value of attribute thread.



130
131
132
# File 'lib/new_relic/agent/pipe_channel_manager.rb', line 130

def thread
  @thread
end

#timeoutObject

This attr_accessor intentionally provides unsynchronized access to the within the Resque child process, and must be unsynchronized in order to avoid a potential deadlock in which the PipeChannelManager::Listener thread in the parent process is holding the @pipes_lock at the time of the fork.



138
139
140
# File 'lib/new_relic/agent/pipe_channel_manager.rb', line 138

def timeout
  @timeout
end

Instance Method Details

#close_all_pipesObject



218
219
220
221
222
223
224
225
226
# File 'lib/new_relic/agent/pipe_channel_manager.rb', line 218

def close_all_pipes
  @pipes_lock.synchronize do
    @pipes.each do |id, pipe|
      # Needs else branch coverage
      pipe.close if pipe # rubocop:disable Style/SafeNavigation
    end
    @pipes = {}
  end
end

#register_pipe(id) ⇒ Object



153
154
155
156
157
158
159
# File 'lib/new_relic/agent/pipe_channel_manager.rb', line 153

def register_pipe(id)
  @pipes_lock.synchronize do
    @pipes[id] = Pipe.new
  end

  wakeup
end

#startObject



161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
# File 'lib/new_relic/agent/pipe_channel_manager.rb', line 161

def start
  return if @started == true

  @started = true
  @thread = NewRelic::Agent::Threading::AgentThread.create('Pipe Channel Manager') do
    now = nil
    loop do
      clean_up_pipes

      pipes_to_listen_to = @pipes_lock.synchronize do
        @pipes.values.map { |pipe| pipe.out } + [wake.out]
      end

      if now
        NewRelic::Agent.record_metric(
          'Supportability/Listeners',
          Process.clock_gettime(Process::CLOCK_REALTIME) - now
        )
      end

      if ready = IO.select(pipes_to_listen_to, [], [], @select_timeout)
        now = Process.clock_gettime(Process::CLOCK_REALTIME)

        ready_pipes = ready[0]
        ready_pipes.each do |pipe|
          merge_data_from_pipe(pipe) unless pipe == wake.out
        end

        begin
          wake.out.read_nonblock(1) if ready_pipes.include?(wake.out)
        rescue IO::WaitReadable
          NewRelic::Agent.logger.error('Issue while reading from the ready pipe')
          NewRelic::Agent.logger.error("Ready pipes: #{ready_pipes.map(&:to_s)}, wake.out pipe: #{wake.out}")
        end
      end

      break unless should_keep_listening?
    end
  end
  sleep(0.001) # give time for the thread to spawn
end

#started?Boolean

Returns:

  • (Boolean)


232
233
234
# File 'lib/new_relic/agent/pipe_channel_manager.rb', line 232

def started?
  @started
end

#stopObject



209
210
211
212
213
214
215
216
# File 'lib/new_relic/agent/pipe_channel_manager.rb', line 209

def stop
  return unless @started == true

  stop_listener_thread
  close_all_pipes
  @wake.close
  @wake = nil
end

#stop_listener_threadObject



203
204
205
206
207
# File 'lib/new_relic/agent/pipe_channel_manager.rb', line 203

def stop_listener_thread
  @started = false
  wakeup
  @thread.join
end

#wakeObject



228
229
230
# File 'lib/new_relic/agent/pipe_channel_manager.rb', line 228

def wake
  @wake ||= Pipe.new
end

#wakeupObject



149
150
151
# File 'lib/new_relic/agent/pipe_channel_manager.rb', line 149

def wakeup
  wake.in << '.'
end