Class: Arborist::Monitor::ConnectionBatching::BatchRunner

Inherits:
Object
  • Object
show all
Extended by:
Loggability
Defined in:
lib/arborist/monitor/connection_batching.rb

Overview

An object that manages batching of connections and gathering results.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(enum, batch_size, timeout) ⇒ BatchRunner

Create a new BatchRunner for the specified enum (an Enumerator)



32
33
34
35
36
37
38
39
40
# File 'lib/arborist/monitor/connection_batching.rb', line 32

def initialize( enum, batch_size, timeout )
  @enum              = enum
  @results           = {}
  @current_batch     = []
  @connection_hashes = {}
  @start             = nil
  @batch_size        = batch_size || DEFAULT_BATCH_SIZE
  @timeout           = timeout || DEFAULT_TIMEOUT
end

Instance Attribute Details

#batch_sizeObject (readonly)

The maximum number of connections to have running at any time.



70
71
72
# File 'lib/arborist/monitor/connection_batching.rb', line 70

def batch_size
  @batch_size
end

#connection_hashesObject (readonly)

An index of the current batch’s connection hashes by connection.



62
63
64
# File 'lib/arborist/monitor/connection_batching.rb', line 62

def connection_hashes
  @connection_hashes
end

#current_batchObject (readonly)

The batch of connection hashes that are currently being selected, ordered from oldest to newest.



58
59
60
# File 'lib/arborist/monitor/connection_batching.rb', line 58

def current_batch
  @current_batch
end

#enumObject

The Enumerator that yields connection hashes



49
50
51
# File 'lib/arborist/monitor/connection_batching.rb', line 49

def enum
  @enum
end

#resultsObject (readonly)

The results hash



53
54
55
# File 'lib/arborist/monitor/connection_batching.rb', line 53

def results
  @results
end

#startObject

The Time the batch runner started.



66
67
68
# File 'lib/arborist/monitor/connection_batching.rb', line 66

def start
  @start
end

#timeoutObject (readonly)

The connection timeout from the monitor, in seconds



74
75
76
# File 'lib/arborist/monitor/connection_batching.rb', line 74

def timeout
  @timeout
end

Instance Method Details

#add_connection(conn_hash) ⇒ Object

Add a new conn_hash to the currrent batch. If the conn_hash‘s connection is an exception, don’t add it and just add an error status for it built from the exception.



108
109
110
111
112
113
114
115
116
117
# File 'lib/arborist/monitor/connection_batching.rb', line 108

def add_connection( conn_hash )
  if conn_hash[:conn].is_a?( ::Exception )
    self.log.debug "Adding an error result for %{identifier}." % conn_hash
    self.results[ conn_hash[:identifier] ] = { error: conn_hash[:conn].message }
  else
    self.log.debug "Added connection for %{identifier} to the batch." % conn_hash
    self.current_batch.push( conn_hash )
    self.connection_hashes[ conn_hash[:conn] ] = conn_hash
  end
end

#batch_full?Boolean

Returns true if the current batch is at capacity.

Returns:

  • (Boolean)


85
86
87
# File 'lib/arborist/monitor/connection_batching.rb', line 85

def batch_full?
  return self.current_batch.length >= self.batch_size
end

#fill_batchObject

Fill the #current_batch if it’s not yet at capacity and there are more connections to be made.



139
140
141
142
143
144
145
146
# File 'lib/arborist/monitor/connection_batching.rb', line 139

def fill_batch
  # If the enum is not nil and the array isn't full, fetch a new connection
  while self.enum && !self.batch_full?
    self.log.debug "Adding connections to the queue."
    conn_hash = self.next_connection or break
    self.add_connection( conn_hash )
  end
end

#finished?Boolean

Returns true if the runner has been run and all connections have been handled.

Returns:

  • (Boolean)


79
80
81
# File 'lib/arborist/monitor/connection_batching.rb', line 79

def finished?
  return self.start && self.enum.nil? && self.current_batch.empty?
end

#next_connectionObject

Fetch the next connection from the Enumerator, unsetting the enumerator and returning nil when it reaches the end.



92
93
94
95
96
97
98
99
100
101
102
# File 'lib/arborist/monitor/connection_batching.rb', line 92

def next_connection
  conn_hash = self.enum.next
  conn_hash[:start] = Time.now
  conn_hash[:timeout_at] = conn_hash[:start] + self.timeout

  return conn_hash
rescue StopIteration
  self.log.debug "Reached the end of the connections enum."
  self.enum = nil
  return nil
end

#remove_connection(conn_hash) ⇒ Object

Remove the specified conn_hash from the current batch.



121
122
123
124
# File 'lib/arborist/monitor/connection_batching.rb', line 121

def remove_connection( conn_hash )
  self.current_batch.delete( conn_hash )
  self.connection_hashes.delete( conn_hash[:conn] )
end

#remove_socket(socket) ⇒ Object

Remove the connection hash for the specified socket from the current batch and return it (if it was in the batch).



129
130
131
132
133
134
# File 'lib/arborist/monitor/connection_batching.rb', line 129

def remove_socket( socket )
  conn_hash = self.connection_hashes.delete( socket )
  self.current_batch.delete( conn_hash )

  return conn_hash
end

#remove_timedout_connectionsObject

Shift any connections which have timed out off of the current batch and return the timeout of the oldest non-timed-out connection.



151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
# File 'lib/arborist/monitor/connection_batching.rb', line 151

def remove_timedout_connections
  expired = self.current_batch.take_while do |conn_hash|
    conn_hash[ :timeout_at ].past?
  end

  wait_seconds = if self.current_batch.empty?
      1
    else
      self.current_batch.first[:timeout_at] - Time.now
    end

  expired.each do |conn_hash|
    self.remove_connection( conn_hash )
    self.log.debug "Discarding timed-out socket for %{identifier}." % conn_hash

    elapsed = conn_hash[:timeout_at] - conn_hash[:start]
    self.results[ conn_hash[:identifier] ] = {
      error: "Timeout after %0.3fs" % [ elapsed ]
    }
  end

  return wait_seconds.abs
end

#run(&block) ⇒ Object

Run the batch runner, yielding to the specified block as each connection becomes ready.



193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
# File 'lib/arborist/monitor/connection_batching.rb', line 193

def run( &block )
  self.start = Time.now

  until self.finished?
    self.log.debug "Getting the status of %d connections." %
      [ self.current_batch.length ]

    self.fill_batch
    wait_seconds = self.remove_timedout_connections
    ready = self.wait_for_ready_connections( wait_seconds )

    # If the select returns ready sockets
    #   Build successful status for each ready socket
    now = Time.now
    ready.each do |sock|
      conn_hash = self.remove_socket( sock ) or
        raise "Ready socket %p was not in the current batch!" % [ sock ]

      identifier, start = conn_hash.values_at( :identifier, :start )
      duration = now - start

      results[ identifier ] = block.call( conn_hash, duration )
    end if ready
  end

  return Time.now - self.start
end

#wait_for_ready_connections(wait_seconds) ⇒ Object

Wait at most wait_seconds for one of the sockets in the current batch to become ready. If any are ready before the wait_seconds have elapsed, returns them as an Array. If wait_seconds goes by without any sockets becoming ready, or if there were no sockets to wait on, returns nil.



180
181
182
183
184
185
186
187
188
# File 'lib/arborist/monitor/connection_batching.rb', line 180

def wait_for_ready_connections( wait_seconds )
  sockets = self.connection_hashes.keys
  ready = nil

  self.log.debug "Selecting on %d sockets." % [ sockets.length ]
  _, ready, _ = IO.select( nil, sockets, nil, wait_seconds ) unless sockets.empty?

  return ready
end