Class: Traject::Indexer
- Inherits:
-
Object
- Object
- Traject::Indexer
- Includes:
- Macros::Basic, Macros::Marc21, QualifiedConstGet
- Defined in:
- lib/traject/indexer.rb,
lib/traject/indexer/settings.rb
Overview
A Hash of settings for a Traject::Indexer, which also ends up passed along to other objects Traject::Indexer interacts with.
Enhanced with a few features from Hashie, to make it for instance string/symbol indifferent
#provide(key, value) is added, to do like settings ||= value, set only if not already set (but unlike ||=, nil or false can count as already set)
Also has an interesting ‘defaults’ system, meant to play along with configuration file ‘provide’ statements. There is a built-in hash of defaults, which will be lazily filled in if accessed and not yet set. (nil can count as set, though!). If they haven’t been lazily set yet, then #provide will still fill them in. But you can also call fill_in_defaults! to fill all defaults in, if you know configuration files have all been loaded, and want to fill them in for inspection.
Defined Under Namespace
Classes: ArityError, Context, LastNamedStep, NamingError, Settings
Instance Attribute Summary collapse
Instance Method Summary collapse
-
#create_logger ⇒ Object
Create logger according to settings.
- #each_record(aLambda = nil, &block) ⇒ Object
-
#id_string(record) ⇒ Object
get a printable id from record for error logging.
-
#initialize(arg_settings = {}) ⇒ Indexer
constructor
optional hash or Traject::Indexer::Settings object of settings.
- #last_named_step ⇒ Object
-
#log_mapping_errors(context, index_step, aProc) ⇒ Object
just a wrapper that captures and records any unexpected errors raised in mapping, along with contextual information on record and location in source file of mapping rule.
-
#log_skip(context) ⇒ Object
Log that the current record is being skipped, using data in context.position and context.skipmessage.
-
#logger_argument ⇒ Object
Just calculates the arg that’s gonna be given to Yell.new or SomeLogger.new.
-
#logger_options ⇒ Object
Second arg to Yell.new, options hash, calculated from settings.
-
#map_record(record) ⇒ Object
Processes a single record according to indexing rules set up in this indexer.
-
#map_to_context!(context) ⇒ Object
Maps a single record INTO the second argument, a Traject::Indexer::Context.
-
#process(io_stream) ⇒ Object
Processes a stream of records, reading from the configured Reader, mapping according to configured mapping rules, and then writing to configured Writer.
-
#reader!(io_stream) ⇒ Object
Instantiate a Traject Reader, using class set in #reader_class, initialized with io_stream passed in.
-
#settings(new_settings = nil, &block) ⇒ Object
The Indexer’s settings are a hash of key/values – not nested, just one level – of configuration settings.
-
#to_field(field_name, aLambda = nil, &block) ⇒ Object
Used to define an indexing mapping.
-
#verify_each_record_arguments(aLambda, block) ⇒ Object
Verify the procs sent to each_record to make sure it’s all kosher.
-
#verify_field_name(field_name) ⇒ Object
Verify that the field name is good, and throw a useful error if not.
-
#verify_to_field_arguments(field_name, aLambda, block) ⇒ Object
Verify the various, increasingly-complex things that can be sent to to_field to make sure it’s all kosher.
-
#writer! ⇒ Object
Instantiate a Traject Writer, suing class set in #writer_class.
Methods included from Macros::Basic
Methods included from Macros::Marc21
#extract_all_marc_values, #extract_marc, first!, #serialized_marc, trim_punctuation
Methods included from QualifiedConstGet
Constructor Details
Instance Attribute Details
#logger ⇒ Object
106 107 108 |
# File 'lib/traject/indexer.rb', line 106 def logger @logger ||= create_logger end |
#reader_class ⇒ Object
370 371 372 373 374 375 |
# File 'lib/traject/indexer.rb', line 370 def reader_class unless defined? @reader_class @reader_class = qualified_const_get(settings["reader_class_name"]) end return @reader_class end |
#writer_class ⇒ Object
377 378 379 380 381 382 |
# File 'lib/traject/indexer.rb', line 377 def writer_class unless defined? @writer_class @writer_class = qualified_const_get(settings["writer_class_name"]) end return @writer_class end |
Instance Method Details
#create_logger ⇒ Object
Create logger according to settings
141 142 143 144 145 146 147 148 149 150 |
# File 'lib/traject/indexer.rb', line 141 def create_logger # log everything to STDERR or specified logfile logger = Yell.new( logger_argument, ) # ADDITIONALLY log error and higher to.... if settings["log.error_file"] logger.adapter :file, settings["log.error_file"], :level => 'gte.error' end return logger end |
#each_record(aLambda = nil, &block) ⇒ Object
170 171 172 173 174 175 176 177 178 |
# File 'lib/traject/indexer.rb', line 170 def each_record(aLambda = nil, &block) verify_each_record_arguments(aLambda, block) @index_steps << { :lambda => aLambda, :block => block, :type => :each_record, :source_location => Traject::Util.extract_caller_location(caller.first) } end |
#id_string(record) ⇒ Object
get a printable id from record for error logging. Maybe override this for a future XML version.
397 398 399 |
# File 'lib/traject/indexer.rb', line 397 def id_string(record) record && record['001'] && record['001'].value.to_s end |
#last_named_step ⇒ Object
452 453 454 |
# File 'lib/traject/indexer.rb', line 452 def last_named_step return LastNamedStep.new(@index_steps) end |
#log_mapping_errors(context, index_step, aProc) ⇒ Object
just a wrapper that captures and records any unexpected errors raised in mapping, along with contextual information on record and location in source file of mapping rule.
Re-raises error at the moment.
log_errors(context, some_lambda) do
all_sorts_of_stuff # that will have errors logged
end
264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 |
# File 'lib/traject/indexer.rb', line 264 def log_mapping_errors(context, index_step, aProc) begin yield rescue Exception => e msg = "Unexpected error on record id `#{id_string(context.source_record)}` at file position #{context.position}\n" conf = context.field_name ? "to_field '#{context.field_name}'" : "each_record" msg += " while executing #{conf} defined at #{index_step[:source_location]}\n" msg += Traject::Util.(e) logger.error msg logger.debug "Record: " + context.source_record.to_s raise e end end |
#log_skip(context) ⇒ Object
Log that the current record is being skipped, using data in context.position and context.skipmessage
366 367 368 |
# File 'lib/traject/indexer.rb', line 366 def log_skip(context) logger.debug "Skipped record #{context.position}: #{context.}" end |
#logger_argument ⇒ Object
Just calculates the arg that’s gonna be given to Yell.new or SomeLogger.new
114 115 116 117 118 119 120 121 122 |
# File 'lib/traject/indexer.rb', line 114 def logger_argument specified = settings["log.file"] || "STDERR" case specified when "STDOUT" then STDOUT when "STDERR" then STDERR else specified end end |
#logger_options ⇒ Object
Second arg to Yell.new, options hash, calculated from settings
126 127 128 129 130 131 132 133 134 135 136 137 138 |
# File 'lib/traject/indexer.rb', line 126 def # formatter, default is fairly basic format = settings["log.format"] || "%d %5L %m" format = case format when "false" then false when "" then nil else format end level = settings["log.level"] || "info" {:format => format, :level => level} end |
#map_record(record) ⇒ Object
Processes a single record according to indexing rules set up in this indexer. Returns the output hash (a hash whose keys are string fields, and values are arrays of one or more values in that field)
This is a convenience shortcut for #map_to_context! – use that one if you want to provide addtional context like position, and/or get back the full context.
188 189 190 191 192 |
# File 'lib/traject/indexer.rb', line 188 def map_record(record) context = Context.new(:source_record => record, :settings => settings) map_to_context!(context) return context.output_hash end |
#map_to_context!(context) ⇒ Object
Maps a single record INTO the second argument, a Traject::Indexer::Context.
Context must be passed with a #source_record and #settings, and optionally a #position.
Context will be mutated by this method, most significantly by adding an #output_hash, a hash from fieldname to array of values in that field.
Pass in a context with a set #position if you want that to be available to mapping routines.
Returns the context passed in as second arg, as a convenience for chaining etc.
206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 |
# File 'lib/traject/indexer.rb', line 206 def map_to_context!(context) @index_steps.each do |index_step| # Don't bother if we're skipping this record break if context.skip? if index_step[:type] == :to_field accumulator = [] context.field_name = index_step[:field_name] # Might have a lambda arg AND a block, we execute in order, # with same accumulator. [index_step[:lambda], index_step[:block]].each do |aProc| if aProc log_mapping_errors(context, index_step, aProc) do if aProc.arity == 2 aProc.call(context.source_record, accumulator) else aProc.call(context.source_record, accumulator, context) end end end end (context.output_hash[context.field_name] ||= []).concat accumulator unless accumulator.empty? context.field_name = nil elsif index_step[:type] == :each_record # one or two arg [index_step[:lambda], index_step[:block]].each do |aProc| if aProc log_mapping_errors(context, index_step, aProc) do if aProc.arity == 1 aProc.call(context.source_record) else aProc.call(context.source_record, context) end end end end else raise ArgumentError.new("An @index_step we don't know how to deal with: #{@index_step}") end end return context end |
#process(io_stream) ⇒ Object
Processes a stream of records, reading from the configured Reader, mapping according to configured mapping rules, and then writing to configured Writer.
returns ‘false’ as a signal to command line to return non-zero exit code for some reason (reason found in logs, presumably). This particular mechanism is open to complexification, starting simple. We do need SOME way to return non-zero to command line.
291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 |
# File 'lib/traject/indexer.rb', line 291 def process(io_stream) settings.fill_in_defaults! count = 0 start_time = batch_start_time = Time.now logger.info "beginning Indexer#process with settings: #{settings.inspect}" reader = self.reader!(io_stream) writer = self.writer! thread_pool = Traject::ThreadPool.new(settings["processing_thread_pool"].to_i) logger.info " with reader: #{reader.class.name} and writer: #{writer.class.name}" reader.each do |record; position| count += 1 # have to use a block local var, so the changing `count` one # doesn't get caught in the closure. Weird, yeah. position = count thread_pool.raise_collected_exception! if settings["debug_ascii_progress"].to_s == "true" $stderr.write "." if count % settings["solrj_writer.batch_size"] == 0 end if settings["log.batch_progress"] && (count % settings["log.batch_progress"].to_i == 0) batch_rps = settings["log.batch_progress"].to_i / (Time.now - batch_start_time) overall_rps = count / (Time.now - start_time) logger.info "Traject::Indexer#process, read #{count} records at id:#{id_string(record)}; #{'%.0f' % batch_rps}/s this batch, #{'%.0f' % overall_rps}/s overall" batch_start_time = Time.now end # we have to use this weird lambda to properly "capture" the count, instead # of having it be bound to the original variable in a non-threadsafe way. # This is confusing, I might not be understanding things properly, but that's where i am. #thread_pool.maybe_in_thread_pool &make_lambda(count, record, writer) thread_pool.maybe_in_thread_pool do context = Context.new(:source_record => record, :settings => settings, :position => position) map_to_context!(context) if context.skip? log_skip(context) else writer.put context end end end $stderr.write "\n" if settings["debug_ascii_progress"].to_s == "true" logger.debug "Shutting down #processing mapper threadpool..." thread_pool.shutdown_and_wait logger.debug "#processing mapper threadpool shutdown complete." thread_pool.raise_collected_exception! writer.close if writer.respond_to?(:close) elapsed = Time.now - start_time avg_rps = (count / elapsed) logger.info "finished Indexer#process: #{count} records in #{'%.3f' % elapsed} seconds; #{'%.1f' % avg_rps} records/second overall." if writer.respond_to?(:skipped_record_count) && writer.skipped_record_count > 0 logger.error "Indexer#process returning 'false' due to #{writer.skipped_record_count} skipped records." return false end return true end |
#reader!(io_stream) ⇒ Object
Instantiate a Traject Reader, using class set in #reader_class, initialized with io_stream passed in
386 387 388 |
# File 'lib/traject/indexer.rb', line 386 def reader!(io_stream) return reader_class.new(io_stream, settings.merge("logger" => logger)) end |
#settings(new_settings = nil, &block) ⇒ Object
The Indexer’s settings are a hash of key/values – not nested, just one level – of configuration settings. Keys are strings.
The settings method with no arguments returns that hash.
With a hash and/or block argument, can be used to set new key/values. Each call merges onto the existing settings hash.
indexer.settings("a" => "a", "b" => "b")
indexer.settings do
store "b", "new b"
end
indexer.settings #=> {"a" => "a", "b" => "new b"}
even with arguments, returns settings hash too, so can be chained.
98 99 100 101 102 103 104 |
# File 'lib/traject/indexer.rb', line 98 def settings(new_settings = nil, &block) @settings.merge!(new_settings) if new_settings @settings.instance_eval &block if block return @settings end |
#to_field(field_name, aLambda = nil, &block) ⇒ Object
Used to define an indexing mapping.
157 158 159 160 161 162 163 164 165 166 167 168 |
# File 'lib/traject/indexer.rb', line 157 def to_field(field_name, aLambda = nil, &block) verify_to_field_arguments(field_name, aLambda, block) @index_steps << { :field_name => field_name.to_s, :lambda => aLambda, :block => block, :type => :to_field, :source_location => Traject::Util.extract_caller_location(caller.first) } end |
#verify_each_record_arguments(aLambda, block) ⇒ Object
Verify the procs sent to each_record to make sure it’s all kosher.
433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 |
# File 'lib/traject/indexer.rb', line 433 def verify_each_record_arguments(aLambda, block) unless aLambda or block raise ArgumentError.new("Missing Argument: each_record must take a block/lambda as an argument (#{last_named_step.})") end [aLambda, block].each do |proc| # allow negative arity, meaning variable/optional, trust em on that. # but for positive arrity, we need 1 or 2 args if proc unless proc.is_a?(Proc) raise NamingError.new("argument to each_record must be a block/lambda, not a #{proc.class} (#{last_named_step.})") end if (proc.arity == 0 || proc.arity > 2) raise ArityError.new("block/proc given to each_record needs 1 or 2 arguments: #{proc} (#{last_named_step.})") end end end end |
#verify_field_name(field_name) ⇒ Object
Verify that the field name is good, and throw a useful error if not
405 406 407 408 409 |
# File 'lib/traject/indexer.rb', line 405 def verify_field_name(field_name) if field_name.nil? || !field_name.is_a?(String) || field_name.empty? raise NamingError.new("to_field requires the field name (String) as the first argument (#{last_named_step.})") end end |
#verify_to_field_arguments(field_name, aLambda, block) ⇒ Object
Verify the various, increasingly-complex things that can be sent to to_field to make sure it’s all kosher.
“Modification” takes place for zero-argument blocks that return a lambda
417 418 419 420 421 422 423 424 425 426 427 428 429 |
# File 'lib/traject/indexer.rb', line 417 def verify_to_field_arguments(field_name, aLambda, block) verify_field_name(field_name) [aLambda, block].each do |proc| # allow negative arity, meaning variable/optional, trust em on that. # but for positive arrity, we need 2 or 3 args if proc && (proc.arity == 0 || proc.arity == 1 || proc.arity > 3) raise ArityError.new("error parsing field '#{field_name}': block/proc given to to_field needs 2 or 3 (or variable) arguments: #{proc} (#{last_named_step.})") end end end |
#writer! ⇒ Object
Instantiate a Traject Writer, suing class set in #writer_class
391 392 393 |
# File 'lib/traject/indexer.rb', line 391 def writer! return writer_class.new(settings.merge("logger" => logger)) end |