Class: Fbe::Iterate
- Inherits:
-
Object
- Object
- Fbe::Iterate
- Defined in:
- lib/fbe/iterate.rb
Overview
Repository iterator with stateful query execution.
This class provides a DSL for iterating through repositories and executing queries while maintaining state between iterations. It tracks progress using “marker” facts in the factbase and supports features like:
-
Stateful iteration with automatic restart capability
-
GitHub API quota awareness to prevent rate limit issues
-
Configurable repeat counts per repository
-
Timeout controls for long-running operations
The iterator executes a query for each repository, passing the previous result as context. If the query returns nil, it restarts from the beginning for that repository. Progress is persisted in the factbase to support resuming after interruptions.
- Author
-
Yegor Bugayenko ([email protected])
- Copyright
-
Copyright © 2024-2025 Zerocracy
- License
-
MIT
Instance Method Summary collapse
-
#as(label) ⇒ nil
Sets the label for tracking iteration state.
-
#by(query) ⇒ nil
Sets the query to execute for each iteration.
-
#initialize(fb:, loog:, options:, global:, epoch:, kickoff:) ⇒ Iterate
constructor
Creates a new iterator instance.
-
#lifetime_unaware ⇒ nil
Makes the iterator aware of lifetime limits.
-
#over {|Integer, Object| ... } ⇒ nil
Executes the iteration over all configured repositories.
-
#quota_unaware ⇒ nil
Makes the iterator aware of GitHub API quota limits.
-
#repeats(repeats) ⇒ nil
Sets the maximum number of iterations per repository.
-
#sort_by(prop) ⇒ nil
Sets the field to sort results by in ascending order.
-
#timeout_unaware ⇒ nil
Makes the iterator aware of timeout limits.
Constructor Details
#initialize(fb:, loog:, options:, global:, epoch:, kickoff:) ⇒ Iterate
Creates a new iterator instance.
91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 |
# File 'lib/fbe/iterate.rb', line 91 def initialize(fb:, loog:, options:, global:, epoch:, kickoff:) @fb = fb @loog = loog @options = @global = global @epoch = epoch @kickoff = kickoff @label = nil @since = 0 @query = nil @sort_by = nil @repeats = 1 @quota_aware = true @lifetime_aware = true @timeout_aware = true end |
Instance Method Details
#as(label) ⇒ nil
Sets the label for tracking iteration state.
The label is used to create marker facts in the factbase that track the last processed item for each repository. This enables resuming iteration after interruptions.
198 199 200 201 202 203 |
# File 'lib/fbe/iterate.rb', line 198 def as(label) raise 'Label is already set' unless @label.nil? raise 'Cannot set "label" to nil' if label.nil? raise "Wrong label format '#{label}', use [_a-z][a-zA-Z0-9_]*" unless label.match?(/\A[_a-z][a-zA-Z0-9_]*\z/) @label = label end |
#by(query) ⇒ nil
Sets the query to execute for each iteration.
The query can use two special variables:
-
$before: The value from the previous iteration (or initial value)
-
$repository: The current repository ID
163 164 165 166 167 |
# File 'lib/fbe/iterate.rb', line 163 def by(query) raise 'Query is already set' unless @query.nil? raise 'Cannot set query to nil' if query.nil? @query = query end |
#lifetime_unaware ⇒ nil
Makes the iterator aware of lifetime limits.
125 126 127 |
# File 'lib/fbe/iterate.rb', line 125 def lifetime_unaware @lifetime_aware = false end |
#over {|Integer, Object| ... } ⇒ nil
Executes the iteration over all configured repositories.
For each repository, retrieves the last processed value (or uses the initial value from since) and executes the configured query with it. The query receives two parameters: $before (the last processed value) and $repository (GitHub repository ID).
When the query returns a non-nil result, the block is called with the repository ID and query result. The block must return an Integer that will be stored as the new “latest” value for the next iteration.
When the query returns nil, the iteration for that repository restarts from the initial value (set by since), and the block is NOT called.
The method tracks progress using marker facts and supports:
-
Automatic restart when query returns nil
-
Timeout to prevent infinite loops
-
GitHub API quota checking (if enabled)
-
State persistence for resuming after interruptions
Processing flow for each repository:
-
Read the “latest” value from factbase (or use
sinceif not found) -
Execute the query with $before=latest and $repository=repo_id
-
If query returns nil: restart from
sincevalue, skip to next repo -
If query returns a value: call the block with (repo_id, query_result)
-
Store the block’s return value as the new “latest” for next iteration
241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 |
# File 'lib/fbe/iterate.rb', line 241 def over raise 'Use "as" first' if @label.nil? raise 'Use "by" first' if @query.nil? seen = {} oct = Fbe.octo(loog: @loog, options: @options, global: @global) return if Fbe.over?( global: @global, options: @options, loog: @loog, epoch: @epoch, kickoff: @kickoff, quota_aware: @quota_aware, lifetime_aware: @lifetime_aware, timeout_aware: @timeout_aware ) repos = Fbe.unmask_repos( loog: @loog, options: @options, global: @global, quota_aware: @quota_aware ).map { |n| oct.repo_id_by_name(n) } started = Time.now restarted = [] before = repos.to_h do |repo| [ repo, @fb.query( "(agg (and (eq what 'iterate') (eq where 'github') (eq repository #{repo})) (first #{@label}))" ).one&.first || @since ] end starts = before.dup values = {} loop do if Fbe.over?( global: @global, options: @options, loog: @loog, epoch: @epoch, kickoff: @kickoff, quota_aware: @quota_aware, lifetime_aware: @lifetime_aware, timeout_aware: @timeout_aware ) @loog.info("Time to stop after #{started.ago}") break end repos.each do |repo| if Fbe.over?( global: @global, options: @options, loog: @loog, epoch: @epoch, kickoff: @kickoff, quota_aware: @quota_aware, lifetime_aware: @lifetime_aware, timeout_aware: @timeout_aware ) @loog.info("Won't check repository ##{repo}") break end next if restarted.include?(repo) seen[repo] = 0 if seen[repo].nil? if seen[repo] >= @repeats @loog.debug("We've seen too many (#{seen[repo]}) in #{repo}, let's see next one") next end nxt = if @sort_by values[repo] ||= @fb.query(@query).each( @fb, before: before[repo], repository: repo ).map { _1[@sort_by]&.first }.compact.sort.each begin values[repo].next rescue StopIteration nil end else @fb.query(@query).one(@fb, before: before[repo], repository: repo) end before[repo] = if nxt.nil? @loog.debug("Next element after ##{before[repo]} not suggested, re-starting from ##{@since}: #{@query}") restarted << repo values.delete(repo) if @sort_by @since else @loog.debug("Next is ##{nxt}, starting from it") yield(repo, nxt) end unless before[repo].is_a?(Integer) raise "Iterator must return an Integer, but #{before[repo].class} was returned" end seen[repo] += 1 end unless seen.any? { |r, v| v < @repeats && !restarted.include?(r) } @loog.debug("No more repos to scan (out of #{repos.size}), quitting after #{@kickoff.ago}") break end if restarted.size == repos.size @loog.debug("All #{repos.size} repos restarted, quitting after #{@kickoff.ago}") break end end @loog.debug("Finished scanning #{repos.size} repos in #{@kickoff.ago}: #{seen.map { |k, v| "#{k}:#{v}" }.joined}") rescue Fbe::OffQuota => e @loog.info(e.) ensure if defined?(repos) && !repos.nil? && defined?(before) && !before.nil? && defined?(starts) && !starts.nil? repos.each do |repo| next if before[repo] == starts[repo] f = Fbe.if_absent(fb: @fb, always: true) do |n| n.what = 'iterate' n.where = 'github' n.repository = repo end Fbe.overwrite(f, @label, before[repo], fb: @fb) end end end |
#quota_unaware ⇒ nil
Makes the iterator aware of GitHub API quota limits.
When enabled, the iterator will check quota status before processing each repository and gracefully stop when the quota is exhausted. This prevents API errors and allows for resuming later.
118 119 120 |
# File 'lib/fbe/iterate.rb', line 118 def quota_unaware @quota_aware = false end |
#repeats(repeats) ⇒ nil
Sets the maximum number of iterations per repository.
Controls how many times the query will be executed for each repository before moving to the next one. Useful for limiting processing scope.
146 147 148 149 150 |
# File 'lib/fbe/iterate.rb', line 146 def repeats(repeats) raise 'Cannot set "repeats" to nil' if repeats.nil? raise 'The "repeats" must be a positive integer' unless repeats.positive? @repeats = repeats end |
#sort_by(prop) ⇒ nil
Sets the field to sort results by in ascending order.
When set, all matching results will be fetched, sorted by the specified field, and iterated in order. This executes the query once per repository instead of calling one() repeatedly.
180 181 182 183 184 185 |
# File 'lib/fbe/iterate.rb', line 180 def sort_by(prop) raise 'Sort field is already set' unless @sort_by.nil? raise 'Cannot set sort field to nil' if prop.nil? raise 'Sort field must be a String' unless prop.is_a?(String) @sort_by = prop end |
#timeout_unaware ⇒ nil
Makes the iterator aware of timeout limits.
132 133 134 |
# File 'lib/fbe/iterate.rb', line 132 def timeout_unaware @timeout_aware = false end |