Module: Qwirk::Batch::FileWorker

Includes:
Qwirk::BaseWorker
Defined in:
lib/qwirk/batch/file_worker.rb

Overview

Batch worker which reads records from files and queues them up for a separate worker (Qwirk::Adapter::JMS::ReplyWorker) to process. For instance, a worker of this type might look as follows:

class MyBatchWorker
  include Qwirk::Batch::FileWorker

  file :glob => '/home/batch_files/input/**', :age => 1.minute, :max_outstanding_records => 100, :fail_threshold => 0.8, :save_period => 30.seconds
  marshal :string
end

The following options can be used for configuring the class

file:
  :glob => <glob_path>
    The path where files will be processed from.  Files will be renamed with a .processing extension while they are being processed
    and to a .completed extension when processing is completed.
  :age => <duration>
    How old a file must be before it will be processed.  This is to prevent files that are in the middle of being uploaded from begin acquired.
  :poll_time => <duration>
    How often the glob is queried for new files.
  :max_outstanding_records => <integer>
    This is how many outstanding records can be queued at a time.
  :

Defined Under Namespace

Modules: ClassMethods

Instance Attribute Summary

Attributes included from Qwirk::BaseWorker

#config, #index

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Qwirk::BaseWorker

#to_s, worker_classes

Class Method Details

.default_acquire_file_strategyObject



70
71
72
# File 'lib/qwirk/batch/file_worker.rb', line 70

def self.default_acquire_file_strategy
  @@default_acquire_file_strategy
end

.default_acquire_file_strategy=(default_strategy) ⇒ Object

Set the global default acquire_file_strategy for an organization



66
67
68
# File 'lib/qwirk/batch/file_worker.rb', line 66

def self.default_acquire_file_strategy=(default_strategy)
  @@default_acquire_file_strategy = default_strategy
end

.default_file_status_strategyObject



88
89
90
# File 'lib/qwirk/batch/file_worker.rb', line 88

def self.default_file_status_strategy
  self.file_status_strategy_to_sym(@@default_file_status_strategy)
end

.default_file_status_strategy=(default_strategy) ⇒ Object

Set the global default process_file_strategy for an organization



84
85
86
# File 'lib/qwirk/batch/file_worker.rb', line 84

def self.default_file_status_strategy=(default_strategy)
  @@default_file_status_strategy = self.file_status_strategy_from_sym(default_strategy)
end

.default_parse_file_strategyObject



79
80
81
# File 'lib/qwirk/batch/file_worker.rb', line 79

def self.default_parse_file_strategy
  @@default_parse_file_strategy
end

.default_parse_file_strategy=(default_strategy) ⇒ Object

Set the global default parse_file_strategy for an organization



75
76
77
# File 'lib/qwirk/batch/file_worker.rb', line 75

def self.default_parse_file_strategy=(default_strategy)
  @@default_parse_file_strategy = default_strategy
end

.file_status_strategy_from_sym(strategy) ⇒ Object



92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
# File 'lib/qwirk/batch/file_worker.rb', line 92

def self.file_status_strategy_from_sym(strategy)
  if strategy.kind_of?(Symbol)
    if strategy == :active_record
      require 'qwirk/batch/active_record'
      Qwirk::Batch::ActiveRecord::BatchJob
    elsif strategy == :mongoid
      require 'qwirk/batch/mongoid'
      Qwirk::Batch::Mongoid::BatchJob
    else
      raise "Invalid symbol for file_status_strategy=#{strategy}"
    end
  else
    strategy
  end
end

.file_status_strategy_to_sym(strategy) ⇒ Object



108
109
110
111
112
113
114
115
116
# File 'lib/qwirk/batch/file_worker.rb', line 108

def self.file_status_strategy_to_sym(strategy)
  if strategy == Qwirk::Batch::ActiveRecord::BatchJob
    :active_record
  elsif strategy == Qwirk::Batch::ActiveRecord::BatchJob
    :mongoid
  else
    strategy
  end
end

.included(base) ⇒ Object



60
61
62
63
# File 'lib/qwirk/batch/file_worker.rb', line 60

def self.included(base)
  Qwirk::BaseWorker.included(base)
  base.extend(ClassMethods)
end

Instance Method Details

#initialize(opts = {}) ⇒ Object



130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
# File 'lib/qwirk/batch/file_worker.rb', line 130

def initialize(opts={})
  super
  @marshal_type     = (self.class.marshal_type || :ruby).to_s
  @marshaler        = MarshalStrategy.find(@marshal_type)
  @stopped          = false
  @queue_name       = opts[:queue_name] || self.class.queue_name || (self.name.match(/(.*)File$/) && $1)
  raise "queue_name not specified in #{self.class.name}" unless @queue_name
  @reply_queue_name = opts[:reply_queue_name] || self.class.reply_queue_name || "#{@queue_name}Reply"

  file_options = self.class.file_options
  raise "file_options not set for #{self.class.name}" unless file_options
  acquire_strategy_class = file_options.delete(:acquire_strategy) || FileWorker.default_acquire_file_strategy
  parse_strategy_class   = file_options.delete(:parse_strategy)   || FileWorker.default_parse_file_strategy
  status_strategy        = file_options.delete(:status_strategy)  || FileWorker.default_parse_file_strategy
  raise 'No status_strategy defined' unless status_strategy
  status_strategy_class  = FileWorker.file_status_strategy_from_sym(status_strategy)
  @acquire_file_strategy = acquire_strategy_class.new(file_options)
  @parse_file_strategy   = parse_strategy_class.new(file_options)
  @file_status_strategy  = status_strategy_class.new(file_options)
  @max_outstanding_records = file_options[:max_outstanding_records] || 10
end

#joinObject



175
176
177
# File 'lib/qwirk/batch/file_worker.rb', line 175

def join
  thread.join
end

#startObject



152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
# File 'lib/qwirk/batch/file_worker.rb', line 152

def start
  #TODO: look for current job
  while file = @acquire_file_strategy.acquire_file do
    @parse_file_strategy.open(file)
    @reply_thread = Thread.new do
      java.lang.Thread.current_thread.name = "Qwirk worker (reply): #{worker}"
      reply_event_loop
    end
    begin
      @record_total = @parse_file_strategy.record_total
      process_file
    ensure
      @parse_file_strategy.close
    end
  end
end

#statusObject



179
180
181
# File 'lib/qwirk/batch/file_worker.rb', line 179

def status
  raise "Need to override status method in #{self.class.name}"
end

#stopObject



170
171
172
173
# File 'lib/qwirk/batch/file_worker.rb', line 170

def stop
  @stopped = true
  @acquire_file_strategy.stop
end