Class: Arborist::Monitor::ConnectionBatching::BatchRunner
- Inherits:
-
Object
- Object
- Arborist::Monitor::ConnectionBatching::BatchRunner
- Extended by:
- Loggability
- Defined in:
- lib/arborist/monitor/connection_batching.rb
Overview
An object that manages batching of connections and gathering results.
Instance Attribute Summary collapse
-
#batch_size ⇒ Object
readonly
The maximum number of connections to have running at any time.
-
#connection_hashes ⇒ Object
readonly
An index of the current batch’s connection hashes by connection.
-
#current_batch ⇒ Object
readonly
The batch of connection hashes that are currently being selected, ordered from oldest to newest.
-
#enum ⇒ Object
The Enumerator that yields connection hashes.
-
#results ⇒ Object
readonly
The results hash.
-
#start ⇒ Object
The Time the batch runner started.
-
#timeout ⇒ Object
readonly
The connection timeout from the monitor, in seconds.
Instance Method Summary collapse
-
#add_connection(conn_hash) ⇒ Object
Add a new conn_hash to the currrent batch.
-
#batch_full? ⇒ Boolean
Returns
true
if the current batch is at capacity. -
#fill_batch ⇒ Object
Fill the #current_batch if it’s not yet at capacity and there are more connections to be made.
-
#finished? ⇒ Boolean
Returns
true
if the runner has been run and all connections have been handled. -
#initialize(enum, batch_size, timeout) ⇒ BatchRunner
constructor
Create a new BatchRunner for the specified
enum
(an Enumerator). -
#next_connection ⇒ Object
Fetch the next connection from the Enumerator, unsetting the enumerator and returning
nil
when it reaches the end. -
#remove_connection(conn_hash) ⇒ Object
Remove the specified
conn_hash
from the current batch. -
#remove_socket(socket) ⇒ Object
Remove the connection hash for the specified
socket
from the current batch and return it (if it was in the batch). -
#remove_timedout_connections ⇒ Object
Shift any connections which have timed out off of the current batch and return the timeout of the oldest non-timed-out connection.
-
#run(&block) ⇒ Object
Run the batch runner, yielding to the specified
block
as each connection becomes ready. -
#wait_for_ready_connections(wait_seconds) ⇒ Object
Wait at most
wait_seconds
for one of the sockets in the current batch to become ready.
Constructor Details
#initialize(enum, batch_size, timeout) ⇒ BatchRunner
Create a new BatchRunner for the specified enum
(an Enumerator)
32 33 34 35 36 37 38 39 40 |
# File 'lib/arborist/monitor/connection_batching.rb', line 32 def initialize( enum, batch_size, timeout ) @enum = enum @results = {} @current_batch = [] @connection_hashes = {} @start = nil @batch_size = batch_size || DEFAULT_BATCH_SIZE @timeout = timeout || DEFAULT_TIMEOUT end |
Instance Attribute Details
#batch_size ⇒ Object (readonly)
The maximum number of connections to have running at any time.
70 71 72 |
# File 'lib/arborist/monitor/connection_batching.rb', line 70 def batch_size @batch_size end |
#connection_hashes ⇒ Object (readonly)
An index of the current batch’s connection hashes by connection.
62 63 64 |
# File 'lib/arborist/monitor/connection_batching.rb', line 62 def connection_hashes @connection_hashes end |
#current_batch ⇒ Object (readonly)
The batch of connection hashes that are currently being selected, ordered from oldest to newest.
58 59 60 |
# File 'lib/arborist/monitor/connection_batching.rb', line 58 def current_batch @current_batch end |
#enum ⇒ Object
The Enumerator that yields connection hashes
49 50 51 |
# File 'lib/arborist/monitor/connection_batching.rb', line 49 def enum @enum end |
#results ⇒ Object (readonly)
The results hash
53 54 55 |
# File 'lib/arborist/monitor/connection_batching.rb', line 53 def results @results end |
#start ⇒ Object
The Time the batch runner started.
66 67 68 |
# File 'lib/arborist/monitor/connection_batching.rb', line 66 def start @start end |
#timeout ⇒ Object (readonly)
The connection timeout from the monitor, in seconds
74 75 76 |
# File 'lib/arborist/monitor/connection_batching.rb', line 74 def timeout @timeout end |
Instance Method Details
#add_connection(conn_hash) ⇒ Object
Add a new conn_hash to the currrent batch. If the conn_hash
‘s connection is an exception, don’t add it and just add an error status for it built from the exception.
108 109 110 111 112 113 114 115 116 117 |
# File 'lib/arborist/monitor/connection_batching.rb', line 108 def add_connection( conn_hash ) if conn_hash[:conn].is_a?( ::Exception ) self.log.debug "Adding an error result for %{identifier}." % conn_hash self.results[ conn_hash[:identifier] ] = { error: conn_hash[:conn]. } else self.log.debug "Added connection for %{identifier} to the batch." % conn_hash self.current_batch.push( conn_hash ) self.connection_hashes[ conn_hash[:conn] ] = conn_hash end end |
#batch_full? ⇒ Boolean
Returns true
if the current batch is at capacity.
85 86 87 |
# File 'lib/arborist/monitor/connection_batching.rb', line 85 def batch_full? return self.current_batch.length >= self.batch_size end |
#fill_batch ⇒ Object
Fill the #current_batch if it’s not yet at capacity and there are more connections to be made.
139 140 141 142 143 144 145 146 |
# File 'lib/arborist/monitor/connection_batching.rb', line 139 def fill_batch # If the enum is not nil and the array isn't full, fetch a new connection while self.enum && !self.batch_full? self.log.debug "Adding connections to the queue." conn_hash = self.next_connection or break self.add_connection( conn_hash ) end end |
#finished? ⇒ Boolean
Returns true
if the runner has been run and all connections have been handled.
79 80 81 |
# File 'lib/arborist/monitor/connection_batching.rb', line 79 def finished? return self.start && self.enum.nil? && self.current_batch.empty? end |
#next_connection ⇒ Object
Fetch the next connection from the Enumerator, unsetting the enumerator and returning nil
when it reaches the end.
92 93 94 95 96 97 98 99 100 101 102 |
# File 'lib/arborist/monitor/connection_batching.rb', line 92 def next_connection conn_hash = self.enum.next conn_hash[:start] = Time.now conn_hash[:timeout_at] = conn_hash[:start] + self.timeout return conn_hash rescue StopIteration self.log.debug "Reached the end of the connections enum." self.enum = nil return nil end |
#remove_connection(conn_hash) ⇒ Object
Remove the specified conn_hash
from the current batch.
121 122 123 124 |
# File 'lib/arborist/monitor/connection_batching.rb', line 121 def remove_connection( conn_hash ) self.current_batch.delete( conn_hash ) self.connection_hashes.delete( conn_hash[:conn] ) end |
#remove_socket(socket) ⇒ Object
Remove the connection hash for the specified socket
from the current batch and return it (if it was in the batch).
129 130 131 132 133 134 |
# File 'lib/arborist/monitor/connection_batching.rb', line 129 def remove_socket( socket ) conn_hash = self.connection_hashes.delete( socket ) self.current_batch.delete( conn_hash ) return conn_hash end |
#remove_timedout_connections ⇒ Object
Shift any connections which have timed out off of the current batch and return the timeout of the oldest non-timed-out connection.
151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 |
# File 'lib/arborist/monitor/connection_batching.rb', line 151 def remove_timedout_connections expired = self.current_batch.take_while do |conn_hash| conn_hash[ :timeout_at ].past? end wait_seconds = if self.current_batch.empty? 1 else self.current_batch.first[:timeout_at] - Time.now end expired.each do |conn_hash| self.remove_connection( conn_hash ) self.log.debug "Discarding timed-out socket for %{identifier}." % conn_hash elapsed = conn_hash[:timeout_at] - conn_hash[:start] self.results[ conn_hash[:identifier] ] = { error: "Timeout after %0.3fs" % [ elapsed ] } end return wait_seconds.abs end |
#run(&block) ⇒ Object
Run the batch runner, yielding to the specified block
as each connection becomes ready.
193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 |
# File 'lib/arborist/monitor/connection_batching.rb', line 193 def run( &block ) self.start = Time.now until self.finished? self.log.debug "Getting the status of %d connections." % [ self.current_batch.length ] self.fill_batch wait_seconds = self.remove_timedout_connections ready = self.wait_for_ready_connections( wait_seconds ) # If the select returns ready sockets # Build successful status for each ready socket now = Time.now ready.each do |sock| conn_hash = self.remove_socket( sock ) or raise "Ready socket %p was not in the current batch!" % [ sock ] identifier, start = conn_hash.values_at( :identifier, :start ) duration = now - start results[ identifier ] = block.call( conn_hash, duration ) end if ready end return Time.now - self.start end |
#wait_for_ready_connections(wait_seconds) ⇒ Object
Wait at most wait_seconds
for one of the sockets in the current batch to become ready. If any are ready before the wait_seconds
have elapsed, returns them as an Array. If wait_seconds
goes by without any sockets becoming ready, or if there were no sockets to wait on, returns nil
.
180 181 182 183 184 185 186 187 188 |
# File 'lib/arborist/monitor/connection_batching.rb', line 180 def wait_for_ready_connections( wait_seconds ) sockets = self.connection_hashes.keys ready = nil self.log.debug "Selecting on %d sockets." % [ sockets.length ] _, ready, _ = IO.select( nil, sockets, nil, wait_seconds ) unless sockets.empty? return ready end |