Class: OrbitalQueue

Inherits:
Object
  • Object
show all
Defined in:
lib/orbitalqueue.rb

Defined Under Namespace

Classes: ItemDestructed, QueueError, QueueObject, QueueRemoveError, QueueUnexisting

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(dir, create = false) ⇒ OrbitalQueue

Create queue master in presented dir.

dir

Queue directory

create

If true is given, creates the queue directory when it is missing



29
30
31
32
33
34
35
36
37
38
39
40
41
42
# File 'lib/orbitalqueue.rb', line 29

def initialize dir, create=false
  @queue_dir = dir

  %w:.checkout .defer .retry .archive:.each do |subdir|
    unless File.exist?(File.join(dir, subdir))
      if create
        require 'fileutils'
        FileUtils.mkdir_p(File.join(dir, subdir))
      else
        raise QueueUnexisting.new("Queue directory #{dir} does not exist.")
      end
    end
  end
end

Class Method Details

.resume(dir) ⇒ Object

Return deferred item to queue



21
22
23
# File 'lib/orbitalqueue.rb', line 21

def self.resume dir
  self.new(dir).resume
end

Instance Method Details

#archive(queue_id, data, archiveinfo_additional = {}) ⇒ Object

Archive current queue relative data and call destruct. This method should be called from QueueObject.



143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
# File 'lib/orbitalqueue.rb', line 143

def archive queue_id, data, archiveinfo_additional={} # :nodoc:
  archiveinfo = archiveinfo_additional.merge({
    archived_at: Time.now.to_i
  })

  retry_data = load_retryobj queue_id

  archive_data = {
    archiveinfo: archiveinfo,
    retry_data: retry_data,
    data: data
  }

  File.open(File.join(@queue_dir, ".archive", (["archive", archiveinfo[:archived_at], queue_id].join("-") + ".marshal")), "w") {|f| Marshal.dump archive_data, f}

  destruct queue_id
end

#complete(queue_id) ⇒ Object

Remove checked out queue item.



121
122
123
124
125
126
127
128
129
130
131
132
# File 'lib/orbitalqueue.rb', line 121

def complete queue_id
  begin
    checkout_file = File.join(@queue_dir, ".checkout", (queue_id + ".marshal"))
    retry_file = File.join(@queue_dir, ".retry", (queue_id + ".marshal"))
    File.delete(checkout_file)
    File.delete(retry_file) if File.exist?(retry_file)
  rescue SystemCallError => e
    raise QueueRemoveError, "Failed to complete queue #{queue_id}: #{e.class}"
  end

  queue_id
end

#defer(queue_id, time_at = nil, max_count = nil) ⇒ Object

Mark queue item as deferred.

:call-seq:

defer(queue_id, time_at, max_count=nil) -> retry_data | nil
defer() {|retry_data| ... }             -> retry_data | nil


166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
# File 'lib/orbitalqueue.rb', line 166

def defer queue_id, time_at=nil, max_count=nil
  retry_data = load_retryobj queue_id
  retry_data[:count] += 1
  if block_given?
    yield retry_data
    retry_data[:until] = retry_data[:until].to_i
  else
    unless time_at
      raise ArgumentError, "time_at is required when no block is given."
    end

    if max_count && retry_data[:count] > max_count
      destruct queue_id
    end
    retry_data[:until] = time_at.to_i
  end

  dump_retryobj queue_id, retry_data

  checkout_path = File.join(@queue_dir, ".checkout", (queue_id) + ".marshal")
  defer_path = File.join(@queue_dir, ".defer", (queue_id) + ".marshal")
  File.rename checkout_path, defer_path

  retry_data
rescue ItemDestructed
  nil
end

#destruct(queue_id) ⇒ Object

Delete all related files with queue_id, and raise ItemDectructed exception.

Raises:



135
136
137
138
139
# File 'lib/orbitalqueue.rb', line 135

def destruct queue_id
  queue_files = Dir.glob([@queue_dir, "**", (queue_id + ".marshal")].join("/"), File::FNM_DOTMATCH)
  File.delete(*queue_files) unless queue_files.empty?
  raise ItemDestructed, "#{queue_id} is destructed."
end

#eachObject

Iterate each queue item data.



105
106
107
108
109
110
# File 'lib/orbitalqueue.rb', line 105

def each
  while item = pop
    yield item.data
    item.complete
  end
end

#each_itemObject

Iterate each queue item.



113
114
115
116
117
118
# File 'lib/orbitalqueue.rb', line 113

def each_item
  while item = pop
    yield item
    item.complete unless item.deferred?
  end
end

#popObject

Pop data from queue. Popped queue items are placed in the checkout directory. After processing is complete, #complete must be called to remove the item from the queue.

If block is given, complete automatically after yield.

:call-seq:

pop()               -> queue_object
pop() {|data| ... } -> queue_id


64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
# File 'lib/orbitalqueue.rb', line 64

def pop
  queue_data = nil
  queue_id = nil
  queue_files = Dir.children(@queue_dir)
  queue_files.each do |qf|
    next if qf[0] == "."
    begin
      File.rename(File.join(@queue_dir, qf), File.join(@queue_dir, ".checkout", qf))
    rescue Errno::ENOENT
      next
    end

    data = Marshal.load(File.read File.join(@queue_dir, ".checkout", File.basename(qf)))
    queue_id = File.basename qf, ".marshal"

    queue_data = OrbitalQueue::QueueObject.new(self, data, queue_id)
    break
  end

  if queue_data && block_given?
    yield queue_data.data
    complete queue_id
  else
    queue_data
  end
end

#pop!Object

Pop data and remove it from queue.

:call-seq:

pop!() -> queue_object


95
96
97
98
99
100
101
102
# File 'lib/orbitalqueue.rb', line 95

def pop!
  queue_item = pop
  if queue_item
    queue_item.complete
  end

  queue_item
end

#push(data) ⇒ Object

Push data to queue.



45
46
47
48
49
50
51
52
53
54
# File 'lib/orbitalqueue.rb', line 45

def push data
  queue_id = sprintf("%d-%d-%s", Time.now.to_i ,$$ , SecureRandom.hex(3))
  queue_filepath = File.join(@queue_dir, (queue_id + ".marshal"))

  File.open(queue_filepath, "w") do |f|
    Marshal.dump data, f
  end

  queue_id
end

#resumeObject

Return deferred item to queue.



195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
# File 'lib/orbitalqueue.rb', line 195

def resume
  now = Time.now.to_i
  deferred_files = Dir.children(File.join(@queue_dir, ".retry"))
  deferred_files.each do |fn|
    retry_path = File.join(@queue_dir, ".retry", fn)
    retry_data = Marshal.load File.read retry_path

    if retry_data[:until] < now
      queue_path = File.join(@queue_dir, fn)
      defer_path = File.join(@queue_dir, ".defer", fn)
      File.rename(defer_path, queue_path)
    end
  end

  nil
end