Class: Mongo::Server::PushMonitor Private

Inherits:
Object
  • Object
show all
Extended by:
Forwardable
Includes:
BackgroundThread
Defined in:
lib/mongo/server/push_monitor.rb,
lib/mongo/server/push_monitor/connection.rb

Overview

This class is part of a private API. You should avoid using this class if possible, as it may be removed or be changed in the future.

A monitor utilizing server-pushed ismaster requests.

When a Monitor handshakes with a 4.4+ server, it creates an instance of PushMonitor. PushMonitor subsequently executes server-pushed ismaster (i.e. awaited & exhausted ismaster) to receive topology changes from the server as quickly as possible. The Monitor still monitors the server for round-trip time calculations and to perform immediate checks as requested by the application.

Since:

  • 2.0.0

Defined Under Namespace

Classes: Connection

Constant Summary

Constants included from Loggable

Loggable::PREFIX

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from BackgroundThread

#run!, #running?

Methods included from Loggable

#log_debug, #log_error, #log_fatal, #log_info, #log_warn, #logger

Constructor Details

#initialize(monitor, topology_version, monitoring, **options) ⇒ PushMonitor

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns a new instance of PushMonitor.

Since:

  • 2.0.0



32
33
34
35
36
37
38
39
40
41
# File 'lib/mongo/server/push_monitor.rb', line 32

def initialize(monitor, topology_version, monitoring, **options)
  if topology_version.nil?
    raise ArgumentError, 'Topology version must be provided but it was nil'
  end
  @monitor = monitor
  @topology_version = topology_version
  @monitoring = monitoring
  @options = options
  @lock = Mutex.new
end

Instance Attribute Details

#monitorServer (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns The server that is being monitored.

Returns:

  • (Server)

    The server that is being monitored.

Since:

  • 2.0.0



44
45
46
# File 'lib/mongo/server/push_monitor.rb', line 44

def monitor
  @monitor
end

#monitoringMonitoring (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns monitoring The monitoring.

Returns:

Since:

  • 2.0.0



50
51
52
# File 'lib/mongo/server/push_monitor.rb', line 50

def monitoring
  @monitoring
end

#optionsHash (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns Push monitor options.

Returns:

  • (Hash)

    Push monitor options.

Since:

  • 2.0.0



53
54
55
# File 'lib/mongo/server/push_monitor.rb', line 53

def options
  @options
end

#topology_versionTopologyVersion (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns Most recently received topology version.

Returns:

Since:

  • 2.0.0



47
48
49
# File 'lib/mongo/server/push_monitor.rb', line 47

def topology_version
  @topology_version
end

Instance Method Details

#do_workObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Since:

  • 2.0.0



83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
# File 'lib/mongo/server/push_monitor.rb', line 83

def do_work
  @lock.synchronize do
    return if @stop_requested
  end

  result = monitoring.publish_heartbeat(server, awaited: true) do
    ismaster
  end
  new_description = monitor.run_sdam_flow(result, awaited: true)
  # When ismaster fails due to a fail point, the response does not
  # include topology version. In this case we need to keep our existing
  # topology version so that we can resume monitoring.
  # The spec does not appear to directly address this case but
  # https://github.com/mongodb/specifications/blob/master/source/server-discovery-and-monitoring/server-monitoring.rst#streaming-ismaster
  # says that topologyVersion should only be updated from successful
  # ismaster responses.
  if new_description.topology_version
    @topology_version = new_description.topology_version
  end
rescue Mongo::Error => exc
  msg = "Error running awaited ismaster on #{server.address}"
  Utils.warn_monitor_exception(msg, exc,
    logger: options[:logger],
    log_prefix: options[:log_prefix],
    bg_error_backtrace: options[:bg_error_backtrace],
  )
end

#ismasterObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Since:

  • 2.0.0



111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
# File 'lib/mongo/server/push_monitor.rb', line 111

def ismaster
  @lock.synchronize do
    if @connection && @connection.pid != Process.pid
      log_warn("Detected PID change - Mongo client should have been reconnected (old pid #{@connection.pid}, new pid #{Process.pid}")
      @connection.disconnect!
      @connection = nil
    end
  end

  @lock.synchronize do
    unless @connection
      @server_pushing = false
      connection = PushMonitor::Connection.new(server.address, options)
      connection.connect!
      @connection = connection
    end
  end

  resp_msg = begin
    unless @server_pushing
      write_ismaster
    end
    read_response
  rescue Mongo::Error
    @lock.synchronize do
      @connection.disconnect!
      @connection = nil
    end
    raise
  end
  @server_pushing = resp_msg.flags.include?(:more_to_come)
  result = resp_msg.documents.first
end

#read_responseObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Since:

  • 2.0.0



155
156
157
158
159
160
161
162
163
164
165
166
167
168
# File 'lib/mongo/server/push_monitor.rb', line 155

def read_response
  if timeout = options[:connect_timeout]
    if timeout < 0
      raise Mongo::SocketTimeoutError, "Requested to read with a negative timeout: #{}"
    elsif timeout > 0
      timeout += options[:heartbeat_frequency] || Monitor::DEFAULT_HEARTBEAT_INTERVAL
    end
  end
  # We set the timeout twice: once passed into read_socket which applies
  # to each individual read operation, and again around the entire read.
  Timeout.timeout(timeout, Error::SocketTimeoutError, "Failed to read an awaited ismaster response in #{timeout} seconds") do
    @lock.synchronize { @connection }.read_response(socket_timeout: timeout)
  end
end

#start!Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Since:

  • 2.0.0



58
59
60
61
62
# File 'lib/mongo/server/push_monitor.rb', line 58

def start!
  @lock.synchronize do
    super
  end
end

#stop!Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Since:

  • 2.0.0



64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
# File 'lib/mongo/server/push_monitor.rb', line 64

def stop!
  @lock.synchronize do
    @stop_requested = true
    if @connection
      # Interrupt any in-progress exhausted ismaster reads by
      # disconnecting the connection.
      @connection.send(:socket).close
    end
  end
  super.tap do
    @lock.synchronize do
      if @connection
        @connection.disconnect!
        @connection = nil
      end
    end
  end
end

#write_ismasterObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Since:

  • 2.0.0



145
146
147
148
149
150
151
152
153
# File 'lib/mongo/server/push_monitor.rb', line 145

def write_ismaster
  payload = Monitor::Connection::ISMASTER_OP_MSG.merge(
    topologyVersion: topology_version.to_doc,
    maxAwaitTimeMS: monitor.heartbeat_interval * 1000,
  )

  req_msg = Protocol::Msg.new([:exhaust_allowed], {}, payload)
  @lock.synchronize { @connection }.write_bytes(req_msg.serialize.to_s)
end