Class: PushmiPullyu::PreservationQueue

Inherits:
Object
  • Object
show all
Defined in:
lib/pushmi_pullyu/preservation_queue.rb

Overview

1) Create a sorted set in Redis (redis.io/topics/data-types). Call it preservation_queue

2) In GenericFile add an after_save that:

- determines a monotonically increasing "score". Obvious scores would be either the time in seconds/milliseconds
  or using something like redis INCR to create an atomic, increasing counter. It doesn't matter if 2 different
  noids ever have the same score, it only that scores generally increase over time.
- zadd preservation_queue score "noid" adds the noid and gives it the score from above.

3) Pushmi-pullyu pops elements out of the sorted set, lowest score to highest.

A sorted set will only ever contain a noid once, with whatever score it was last given. Because preservation_queue is sorted lowest score to highest, and because scores increase over time, a cascade of jobs/updates will cause a noid to keep “moving back” in the queue until it becomes the least recently updated noid in the queue, at which point it will be popped and preserved. Any further updates will trigger a new AIP build.

Defined Under Namespace

Classes: ConnectionError

Instance Method Summary collapse

Constructor Details

#initialize(redis_url: 'redis://localhost:6379', pool_opts: { size: 1, timeout: 5 }, poll_interval: 10, age_at_least: 0, queue_name: 'dev:pmpy_queue') ⇒ PreservationQueue

Returns a new instance of PreservationQueue.

Raises:



24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# File 'lib/pushmi_pullyu/preservation_queue.rb', line 24

def initialize(redis_url: 'redis://localhost:6379',
               pool_opts: { size: 1, timeout: 5 },
               poll_interval: 10,
               age_at_least: 0,
               queue_name: 'dev:pmpy_queue')
  # we use a connection pool even though we're not (currently) threading
  # as it transparently provides for repairing connections if they are closed after long periods of inactivity
  @redis = ConnectionPool.new(pool_opts) do
    Redis.new(url: redis_url)
  end

  raise ConnectionError unless connected?

  @poll_interval = poll_interval
  @age_at_least = age_at_least
  @queue_name = queue_name
end

Instance Method Details

#next_itemObject

Raises:



42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
# File 'lib/pushmi_pullyu/preservation_queue.rb', line 42

def next_item
  raise ConnectionError unless connected?

  @redis.with do |conn|
    conn.watch(@queue_name) do |rd| # transactional mutation of the set is dependent on the set key
      element, score = rd.zrange(@queue_name, 0, 0, with_scores: true).first

      if element && ((Time.now.to_f - @age_at_least) >= score)
        rd.multi do |tx|
          tx.zrem(@queue_name, element) # remove the top element transactionally
        end
        return element
      else
        rd.unwatch # cancel the transaction since there was nothing in the queue
        return nil
      end
    end
  end
end

#wait_next_itemObject



62
63
64
65
66
67
68
69
# File 'lib/pushmi_pullyu/preservation_queue.rb', line 62

def wait_next_item
  while PushmiPullyu.continue_polling?
    element = next_item
    return element if element.present?

    sleep @poll_interval
  end
end