Class: Adrian::DirectoryQueue

Inherits:
Queue
  • Object
show all
Includes:
Filters
Defined in:
lib/adrian/directory_queue.rb

Direct Known Subclasses

RotatingDirectoryQueue

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Filters

#filter?, #filters

Methods inherited from Queue

#max_age, #pop, #push, #verify_age!

Constructor Details

#initialize(options = {}) ⇒ DirectoryQueue

Note: There is the possibility of an item being consumed by multiple processes when its still in the queue after its lock expires. The reason for allowing this is:

1. It's much simpler than introducing a seperate monitoring process to handle lock expiry.
2. This is an acceptable and rare event. e.g. it only happens when the process working on the item crashes without being able to release the lock


22
23
24
25
26
27
28
29
# File 'lib/adrian/directory_queue.rb', line 22

def initialize(options = {})
  super
  @available_path = options.fetch(:path)
  @reserved_path  = options.fetch(:reserved_path, default_reserved_path)
  @logger         = options[:logger]
  filters << Filters::FileLock.new(:duration => options[:lock_duration], :reserved_path => reserved_path)
  filters << Filters::Delay.new(:duration => options[:delay]) if options[:delay]
end

Instance Attribute Details

#available_pathObject (readonly)

Returns the value of attribute available_path.



15
16
17
# File 'lib/adrian/directory_queue.rb', line 15

def available_path
  @available_path
end

#loggerObject (readonly)

Returns the value of attribute logger.



15
16
17
# File 'lib/adrian/directory_queue.rb', line 15

def logger
  @logger
end

#reserved_pathObject (readonly)

Returns the value of attribute reserved_path.



15
16
17
# File 'lib/adrian/directory_queue.rb', line 15

def reserved_path
  @reserved_path
end

Class Method Details

.create(options = {}) ⇒ Object



8
9
10
11
12
13
# File 'lib/adrian/directory_queue.rb', line 8

def self.create(options = {})
  queue = new(options)
  FileUtils.mkdir_p(queue.available_path)
  FileUtils.mkdir_p(queue.reserved_path)
  queue
end

Instance Method Details

#include?(value) ⇒ Boolean

Returns:

  • (Boolean)


48
49
50
51
# File 'lib/adrian/directory_queue.rb', line 48

def include?(value)
  item = wrap_item(value)
  items.include?(item)
end

#lengthObject



44
45
46
# File 'lib/adrian/directory_queue.rb', line 44

def length
  available_files.count { |file| File.file?(file) }
end

#pop_itemObject



31
32
33
34
35
# File 'lib/adrian/directory_queue.rb', line 31

def pop_item
  while item = items.shift
    return item if reserve(item)
  end
end

#push_item(value) ⇒ Object



37
38
39
40
41
42
# File 'lib/adrian/directory_queue.rb', line 37

def push_item(value)
  item = wrap_item(value)
  item.move(available_path)
  item.touch
  self
end