Class: Fbe::Iterate

Inherits:
Object
  • Object
show all
Defined in:
lib/fbe/iterate.rb

Overview

Repository iterator with stateful query execution.

This class provides a DSL for iterating through repositories and executing queries while maintaining state between iterations. It tracks progress using “marker” facts in the factbase and supports features like:

  • Stateful iteration with automatic restart capability

  • GitHub API quota awareness to prevent rate limit issues

  • Configurable repeat counts per repository

  • Timeout controls for long-running operations

The iterator executes a query for each repository, passing the previous result as context. If the query returns nil, it restarts from the beginning for that repository. Progress is persisted in the factbase to support resuming after interruptions.

Author

Yegor Bugayenko ([email protected])

Copyright

Copyright © 2024-2025 Zerocracy

License

MIT

Examples:

Processing pull requests with state management

iterator = Fbe::Iterate.new(fb: fb, loog: loog, options: options, global: global)
iterator.as('pull-requests')
iterator.by('(and (eq what "pull_request") (gt number $before))')
iterator.repeats(10)
iterator.quota_aware
iterator.over(timeout: 600) do |repo_id, pr_number|
  # Process pull request
  fetch_and_store_pr(repo_id, pr_number)
  pr_number  # Return next PR number to process
end

Instance Method Summary collapse

Constructor Details

#initialize(fb:, loog:, options:, global:) ⇒ Iterate

Creates a new iterator instance.

Parameters:

  • fb (Factbase)

    The factbase for storing iteration state

  • loog (Loog)

    The logging facility for debug output

  • options (Judges::Options)

    The options containing repository configuration

  • global (Hash)

    The hash for global caching of API responses



85
86
87
88
89
90
91
92
93
94
95
# File 'lib/fbe/iterate.rb', line 85

def initialize(fb:, loog:, options:, global:)
  @fb = fb
  @loog = loog
  @options = options
  @global = global
  @label = nil
  @since = 0
  @query = nil
  @repeats = 1
  @quota_aware = false
end

Instance Method Details

#as(label) ⇒ nil

Sets the label for tracking iteration state.

The label is used to create marker facts in the factbase that track the last processed item for each repository. This enables resuming iteration after interruptions.

Examples:

Set label for issue processing

iterator.as('issue-processor')

Parameters:

  • label (String)

    Unique identifier for this iteration type

Returns:

  • (nil)

    Nothing is returned

Raises:

  • (RuntimeError)

    If label is already set or nil



155
156
157
158
159
# File 'lib/fbe/iterate.rb', line 155

def as(label)
  raise 'Label is already set' unless @label.nil?
  raise 'Cannot set "label" to nil' if label.nil?
  @label = label
end

#by(query) ⇒ nil

Sets the query to execute for each iteration.

The query can use two special variables:

  • $before: The value from the previous iteration (or initial value)

  • $repository: The current repository ID

Examples:

Query for issues after a certain ID

iterator.by('(and (eq what "issue") (gt id $before) (eq repo $repository))')

Parameters:

  • query (String)

    The Factbase query to execute

Returns:

  • (nil)

    Nothing is returned

Raises:

  • (RuntimeError)

    If query is already set or nil



138
139
140
141
142
# File 'lib/fbe/iterate.rb', line 138

def by(query)
  raise 'Query is already set' unless @query.nil?
  raise 'Cannot set query to nil' if query.nil?
  @query = query
end

#over(timeout: 2 * 60) {|Integer, Object| ... } ⇒ nil

Executes the iteration over all configured repositories.

For each repository, retrieves the last processed value (or uses the initial value from since) and executes the configured query with it. The query receives two parameters: $before (the last processed value) and $repository (GitHub repository ID).

When the query returns a non-nil result, the block is called with the repository ID and query result. The block must return an Integer that will be stored as the new “latest” value for the next iteration.

When the query returns nil, the iteration for that repository restarts from the initial value (set by since), and the block is NOT called.

The method tracks progress using marker facts and supports:

  • Automatic restart when query returns nil

  • Timeout to prevent infinite loops

  • GitHub API quota checking (if enabled)

  • State persistence for resuming after interruptions

Processing flow for each repository:

  1. Read the “latest” value from factbase (or use since if not found)

  2. Execute the query with $before=latest and $repository=repo_id

  3. If query returns nil: restart from since value, skip to next repo

  4. If query returns a value: call the block with (repo_id, query_result)

  5. Store the block’s return value as the new “latest” for next iteration

Examples:

Process issues incrementally

iterator.over(timeout: 300) do |repo_id, issue_number|
  fetch_and_process_issue(repo_id, issue_number)
  issue_number + 1  # Return next issue number to process
end

Parameters:

  • timeout (Float) (defaults to: 2 * 60)

    Maximum seconds to run (default: 120)

Yields:

  • (Integer, Object)

    Repository ID and the result from query execution

Yield Returns:

  • (Integer)

    The value to store as “latest” for next iteration

Returns:

  • (nil)

    Nothing is returned

Raises:

  • (RuntimeError)

    If block doesn’t return an Integer



198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
# File 'lib/fbe/iterate.rb', line 198

def over(timeout: 2 * 60, &)
  raise 'Use "as" first' if @label.nil?
  raise 'Use "by" first' if @query.nil?
  seen = {}
  oct = Fbe.octo(loog: @loog, options: @options, global: @global)
  if oct.off_quota?
    @loog.debug('We are off GitHub quota, cannot even start, sorry')
    return
  end
  repos = Fbe.unmask_repos(loog: @loog, options: @options, global: @global)
  restarted = []
  start = Time.now
  loop do
    if oct.off_quota?
      @loog.info("We are off GitHub quota, time to stop after #{start.ago}")
      break
    end
    repos.each do |repo|
      if oct.off_quota?
        @loog.debug("We are off GitHub quota, we must skip #{repo}")
        break
      end
      if Time.now - start > timeout
        @loog.info("We are doing this for #{start.ago} already, won't check #{repo}")
        next
      end
      next if restarted.include?(repo)
      seen[repo] = 0 if seen[repo].nil?
      if seen[repo] >= @repeats
        @loog.debug("We've seen too many (#{seen[repo]}) in #{repo}, let's see next one")
        next
      end
      rid = oct.repo_id_by_name(repo)
      before = @fb.query(
        "(agg (and (eq what '#{@label}') (eq where 'github') (eq repository #{rid})) (first latest))"
      ).one
      @fb.query("(and (eq what '#{@label}') (eq where 'github') (eq repository #{rid}))").delete!
      before = before.nil? ? @since : before.first
      nxt = @fb.query(@query).one(@fb, before:, repository: rid)
      after =
        if nxt.nil?
          @loog.debug("Next element after ##{before} not suggested, re-starting from ##{@since}: #{@query}")
          restarted << repo
          @since
        else
          @loog.debug("Next is ##{nxt}, starting from it...")
          yield(rid, nxt)
        end
      raise "Iterator must return an Integer, while #{after.class} returned" unless after.is_a?(Integer)
      f = @fb.insert
      f.where = 'github'
      f.repository = rid
      f.latest =
        if after.nil?
          @loog.debug("After is nil at #{repo}, setting the 'latest' to ##{nxt}")
          nxt
        else
          @loog.debug("After is ##{after} at #{repo}, setting the 'latest' to it")
          after
        end
      f.what = @label
      seen[repo] += 1
    end
    unless seen.any? { |r, v| v < @repeats && !restarted.include?(r) }
      @loog.debug("No more repos to scan (out of #{repos.size}), quitting after #{start.ago}")
      break
    end
    if restarted.size == repos.size
      @loog.debug("All #{repos.size} repos restarted, quitting after #{start.ago}")
      break
    end
    if Time.now - start > timeout
      @loog.info("We are iterating for #{start.ago} already, time to give up")
      break
    end
  end
  @loog.debug("Finished scanning #{repos.size} repos in #{start.ago}: #{seen.map { |k, v| "#{k}:#{v}" }.joined}")
end

#quota_awarenil

Makes the iterator aware of GitHub API quota limits.

When enabled, the iterator will check quota status before processing each repository and gracefully stop when the quota is exhausted. This prevents API errors and allows for resuming later.

Examples:

Enable quota awareness

iterator.quota_aware
iterator.over { |repo, item| ... }  # Will stop if quota exhausted

Returns:

  • (nil)

    Nothing is returned



107
108
109
# File 'lib/fbe/iterate.rb', line 107

def quota_aware
  @quota_aware = true
end

#repeats(repeats) ⇒ nil

Sets the maximum number of iterations per repository.

Controls how many times the query will be executed for each repository before moving to the next one. Useful for limiting processing scope.

Examples:

Process up to 100 items per repository

iterator.repeats(100)

Parameters:

  • repeats (Integer)

    The maximum iterations per repository

Returns:

  • (nil)

    Nothing is returned

Raises:

  • (RuntimeError)

    If repeats is nil or not positive



121
122
123
124
125
# File 'lib/fbe/iterate.rb', line 121

def repeats(repeats)
  raise 'Cannot set "repeats" to nil' if repeats.nil?
  raise 'The "repeats" must be a positive integer' unless repeats.positive?
  @repeats = repeats
end