Class: Qwirk::Batch::AcquireFileStrategy

Inherits:
Object
  • Object
show all
Defined in:
lib/qwirk/batch/acquire_file_strategy.rb

Instance Method Summary collapse

Constructor Details

#initialize(options) ⇒ AcquireFileStrategy

Returns a new instance of AcquireFileStrategy.



4
5
6
7
8
9
10
11
# File 'lib/qwirk/batch/acquire_file_strategy.rb', line 4

def initialize(options)
  @glob         = options[:glob]
  raise "file_options glob value not set" unless @glob
  @poll_time    = (options[:poll_time] || 10.0).to_f
  # Ftp's could be in progress, make sure the file is at least 60 seconds old by default before processing
  @age          = (options[:age] || 60).to_i
  @stopped      = false
end

Instance Method Details

#complete_file(file) ⇒ Object



33
34
35
36
37
38
# File 'lib/qwirk/batch/acquire_file_strategy.rb', line 33

def complete_file(file)
  file.match(/(.*)\.processing$/) || raise("#{file} is not currently being processed")
  new_file = $1 + '.completed'
  File.rename(file, new_file)
  return new_file
end

#mark_file_as_processing(file) ⇒ Object



27
28
29
30
31
# File 'lib/qwirk/batch/acquire_file_strategy.rb', line 27

def mark_file_as_processing(file)
  new_file = file + '.processing'
  File.rename(file, new_file)
  return new_file
end

#next_fileObject

Returns the next file or nil if stopped



14
15
16
17
18
19
20
21
22
23
24
25
# File 'lib/qwirk/batch/acquire_file_strategy.rb', line 14

def next_file
  until @stopped
    Dir.glob(@glob).each do |file|
      unless file.match /\.(processing|completed)$/
        return file if (Time.now - File.mtime(file) > @age)
      end
    end
    @sleep_thread = Thread.current
    sleep @poll_time
  end
  return nil
end

#stopObject



40
41
42
43
# File 'lib/qwirk/batch/acquire_file_strategy.rb', line 40

def stop
  @stopped = true
  @sleep_thread.wakeup if @sleep_thread
end