Class: Traject::Indexer

Inherits:
Object
  • Object
show all
Includes:
Macros::Basic, Macros::Marc21, QualifiedConstGet
Defined in:
lib/traject/indexer.rb,
lib/traject/indexer/step.rb,
lib/traject/indexer/context.rb,
lib/traject/indexer/settings.rb

Overview

Represents the context of a specific record being indexed, passed to indexing logic blocks

Defined Under Namespace

Classes: AfterProcessingStep, ArityError, ConfigLoadError, Context, EachRecordStep, NamingError, Settings, ToFieldStep

Constant Summary collapse

ALLOW_NIL_VALUES =

Add the accumulator to the context with the correct field name Do post-processing on the accumulator (remove nil values, allow empty fields, etc)

Only get here if we've got a to_field step; otherwise the call to get a field_name will throw an error

"allow_nil_values".freeze
ALLOW_EMPTY_FIELDS =
"allow_empty_fields".freeze
ALLOW_DUPLICATE_VALUES =
"allow_duplicate_values".freeze

Constants included from Macros::Marc21

Macros::Marc21::EXTRACT_ALL_MARC_VALID_OPTIONS, Macros::Marc21::EXTRACT_MARC_VALID_OPTIONS, Macros::Marc21::SERIALZED_MARC_VALID_OPTIONS

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Macros::Basic

#literal

Methods included from Macros::Marc21

apply_extraction_options, #extract_all_marc_values, extract_marc, extract_marc_from, first!, #serialized_marc, trim_punctuation

Methods included from QualifiedConstGet

#qualified_const_get

Constructor Details

#initialize(arg_settings = {}) ⇒ Indexer

optional hash or Traject::Indexer::Settings object of settings.



181
182
183
184
185
# File 'lib/traject/indexer.rb', line 181

def initialize(arg_settings = {})
  @settings               = Settings.new(arg_settings)
  @index_steps            = []
  @after_processing_steps = []
end

Instance Attribute Details

#loggerObject



260
261
262
# File 'lib/traject/indexer.rb', line 260

def logger
  @logger ||= create_logger
end

#reader_classObject



522
523
524
525
526
527
# File 'lib/traject/indexer.rb', line 522

def reader_class
  unless defined? @reader_class
    @reader_class = qualified_const_get(settings["reader_class_name"])
  end
  return @reader_class
end

#writerObject



545
546
547
# File 'lib/traject/indexer.rb', line 545

def writer
  @writer ||= settings["writer"] || writer!
end

#writer_classObject



529
530
531
# File 'lib/traject/indexer.rb', line 529

def writer_class
  writer.class
end

Instance Method Details

#add_accumulator_to_context!(accumulator, context) ⇒ Object



370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
# File 'lib/traject/indexer.rb', line 370

def add_accumulator_to_context!(accumulator, context)

  accumulator.compact! unless settings[ALLOW_NIL_VALUES]
  return if accumulator.empty? and not (settings[ALLOW_EMPTY_FIELDS])

  field_name                      = context.index_step.field_name
  context.output_hash[field_name] ||= []

  existing_accumulator = context.output_hash[field_name].concat(accumulator)
  existing_accumulator.uniq! unless settings[ALLOW_DUPLICATE_VALUES]

rescue NameError => e
  msg = "Tried to call add_accumulator_to_context with a non-to_field step"
  msg += context.index_step.inspect
  logger.error msg
  raise ArgumentError.new(msg)
end

#after_processing(aLambda = nil, &block) ⇒ Object

Part of DSL, register logic to be called once at the end of processing a stream of records.



256
257
258
# File 'lib/traject/indexer.rb', line 256

def after_processing(aLambda = nil, &block)
  @after_processing_steps << AfterProcessingStep.new(aLambda, block, Traject::Util.extract_caller_location(caller.first))
end

#create_loggerObject

Create logger according to settings



280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
# File 'lib/traject/indexer.rb', line 280

def create_logger

  logger_level  = settings["log.level"] || "info"

  # log everything to STDERR or specified logfile
  logger        = Yell::Logger.new(:null)
  logger.format = logger_format
  logger.level  = logger_level

  logger_destination = settings["log.file"] || "STDERR"
  # We intentionally repeat the logger_level
  # on the adapter, so it will stay there if overall level
  # is changed.
  case logger_destination
    when "STDERR"
      logger.adapter :stderr, level: logger_level, format: logger_format
    when "STDOUT"
      logger.adapter :stdout, level: logger_level, format: logger_format
    else
      logger.adapter :file, logger_destination, level: logger_level, format: logger_format
  end


  # ADDITIONALLY log error and higher to....
  if settings["log.error_file"]
    logger.adapter :file, settings["log.error_file"], :level => 'gte.error'
  end

  return logger
end

#each_record(aLambda = nil, &block) ⇒ Object

Part of DSL, register logic to be called for each record



250
251
252
# File 'lib/traject/indexer.rb', line 250

def each_record(aLambda = nil, &block)
  @index_steps << EachRecordStep.new(aLambda, block, Traject::Util.extract_caller_location(caller.first))
end

#load_config_file(file_path) ⇒ Object

Pass a string file path, or a File object, for a config file to load into indexer.

Can raise:

  • Errno::ENOENT or Errno::EACCES if file path is not accessible
  • Traject::Indexer::ConfigLoadError if exception is raised evaluating the config. A ConfigLoadError has information in it about original exception, and exactly what config file and line number triggered it.


195
196
197
198
199
200
201
202
203
# File 'lib/traject/indexer.rb', line 195

def load_config_file(file_path)
  File.open(file_path) do |file|
    begin
      self.instance_eval(file.read, file_path)
    rescue ScriptError, StandardError => e
      raise ConfigLoadError.new(file_path, e)
    end
  end
end

#log_mapping_errors(context, index_step) ⇒ Object

just a wrapper that captures and records any unexpected errors raised in mapping, along with contextual information on record and location in source file of mapping rule.

Re-raises error at the moment.

log_mapping_errors(context, index_step) do all_sorts_of_stuff # that will have errors logged end



398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
# File 'lib/traject/indexer.rb', line 398

def log_mapping_errors(context, index_step)
  begin
    yield
  rescue Exception => e
    msg = "Unexpected error on record id `#{context.source_record_id}` at file position #{context.position}\n"
    msg += "    while executing #{index_step.inspect}\n"
    msg += Traject::Util.exception_to_log_message(e)

    logger.error msg
    begin
      logger.debug "Record: " + context.source_record.to_s
    rescue Exception => marc_to_s_exception
      logger.debug "(Could not log record, #{marc_to_s_exception})"
    end

    raise e
  end
end

#log_skip(context) ⇒ Object

Log that the current record is being skipped, using data in context.position and context.skipmessage



518
519
520
# File 'lib/traject/indexer.rb', line 518

def log_skip(context)
  logger.debug "Skipped record #{context.position}: #{context.skipmessage}"
end

#logger_formatObject



267
268
269
270
271
272
273
274
275
276
277
# File 'lib/traject/indexer.rb', line 267

def logger_format
  format = settings["log.format"] || "%d %5L %m"
  format = case format
             when "false" then
               false
             when "" then
               nil
             else
               format
           end
end

#map_record(record) ⇒ Object

Processes a single record according to indexing rules set up in this indexer. Returns the output hash (a hash whose keys are string fields, and values are arrays of one or more values in that field)

This is a convenience shortcut for #map_to_context! -- use that one if you want to provide addtional context like position, and/or get back the full context.



319
320
321
322
323
# File 'lib/traject/indexer.rb', line 319

def map_record(record)
  context = Context.new(:source_record => record, :settings => settings)
  map_to_context!(context)
  return context.output_hash
end

#map_to_context!(context) ⇒ Object

Maps a single record INTO the second argument, a Traject::Indexer::Context.

Context must be passed with a #source_record and #settings, and optionally a #position.

Context will be mutated by this method, most significantly by adding an #output_hash, a hash from fieldname to array of values in that field.

Pass in a context with a set #position if you want that to be available to mapping routines.

Returns the context passed in as second arg, as a convenience for chaining etc.



338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
# File 'lib/traject/indexer.rb', line 338

def map_to_context!(context)
  @index_steps.each do |index_step|
    # Don't bother if we're skipping this record
    break if context.skip?

    # Set the index step for error reporting
    context.index_step = index_step
    accumulator        = log_mapping_errors(context, index_step) do
      index_step.execute(context) # will always return [] for an each_record step
    end

    add_accumulator_to_context!(accumulator, context) if index_step.to_field_step?

    # And unset the index step now that we're finished
    context.index_step = nil
  end

  return context
end

#process(io_stream) ⇒ Object

Processes a stream of records, reading from the configured Reader, mapping according to configured mapping rules, and then writing to configured Writer.

returns 'false' as a signal to command line to return non-zero exit code for some reason (reason found in logs, presumably). This particular mechanism is open to complexification, starting simple. We do need SOME way to return non-zero to command line.



427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
# File 'lib/traject/indexer.rb', line 427

def process(io_stream)
  settings.fill_in_defaults!

  count      = 0
  start_time = batch_start_time = Time.now
  logger.debug "beginning Indexer#process with settings: #{settings.inspect}"

  reader = self.reader!(io_stream)

  processing_threads = settings["processing_thread_pool"].to_i
  thread_pool        = Traject::ThreadPool.new(processing_threads)

  logger.info "   Indexer with #{processing_threads} processing threads, reader: #{reader.class.name} and writer: #{writer.class.name}"

  log_batch_size = settings["log.batch_size"] && settings["log.batch_size"].to_i

  reader.each do |record; position |
    count    += 1

    # have to use a block local var, so the changing `count` one
    # doesn't get caught in the closure. Weird, yeah.
    position = count

    thread_pool.raise_collected_exception!

    if settings["debug_ascii_progress"].to_s == "true"
      $stderr.write "." if count % settings["solr_writer.batch_size"].to_i == 0
    end

    context = Context.new(
        :source_record => record,
        :settings      => settings,
        :position      => position,
        :logger        => logger
    )

    if log_batch_size && (count % log_batch_size == 0)
      batch_rps   = log_batch_size / (Time.now - batch_start_time)
      overall_rps = count / (Time.now - start_time)
      logger.send(settings["log.batch_size.severity"].downcase.to_sym, "Traject::Indexer#process, read #{count} records at id:#{context.source_record_id}; #{'%.0f' % batch_rps}/s this batch, #{'%.0f' % overall_rps}/s overall")
      batch_start_time = Time.now
    end

    # We pass context in a block arg to properly 'capture' it, so
    # we don't accidentally share the local var under closure between
    # threads.
    thread_pool.maybe_in_thread_pool(context) do |context|
      map_to_context!(context)
      if context.skip?
        log_skip(context)
      else
        writer.put context
      end

    end

  end
  $stderr.write "\n" if settings["debug_ascii_progress"].to_s == "true"

  logger.debug "Shutting down #processing mapper threadpool..."
  thread_pool.shutdown_and_wait
  logger.debug "#processing mapper threadpool shutdown complete."

  thread_pool.raise_collected_exception!


  writer.close if writer.respond_to?(:close)

  @after_processing_steps.each do |step|
    begin
      step.execute
    rescue Exception => e
      logger.fatal("Unexpected exception #{e} when executing #{step}")
      raise e
    end
  end

  elapsed = Time.now - start_time
  avg_rps = (count / elapsed)
  logger.info "finished Indexer#process: #{count} records in #{'%.3f' % elapsed} seconds; #{'%.1f' % avg_rps} records/second overall."

  if writer.respond_to?(:skipped_record_count) && writer.skipped_record_count > 0
    logger.error "Indexer#process returning 'false' due to #{writer.skipped_record_count} skipped records."
    return false
  end

  return true
end

#reader!(io_stream) ⇒ Object

Instantiate a Traject Reader, using class set in #reader_class, initialized with io_stream passed in



535
536
537
# File 'lib/traject/indexer.rb', line 535

def reader!(io_stream)
  return reader_class.new(io_stream, settings.merge("logger" => logger))
end

#settings(new_settings = nil, &block) ⇒ Object

Part of the config file DSL, for writing settings values.

The Indexer's settings consist of a hash-like Traject::Settings object. The settings hash is not nested hashes, just one level of configuration settings. Keys are always strings, and by convention use "." for namespacing, eg log.file

The settings method with no arguments returns that Settings object.

With a hash and/or block argument, can be used to set new key/values. Each call merges onto the existing settings hash. The block is instance_evald in the context of the Traject::Settings object.

indexer.settings("a" => "a", "b" => "b")

indexer.settings do provide "b", "new b" end

indexer.settings #=> => "a", "b" => "new b"

Note the #provide method is defined on Traject::Settings to write to a setting only if previously not set. You can also use #store to force over-writing even if an existing setting.

Even with arguments, Indexer#settings returns the Settings object, hash too, so can method calls can be chained.



234
235
236
237
238
239
240
# File 'lib/traject/indexer.rb', line 234

def settings(new_settings = nil, &block)
  @settings.merge!(new_settings) if new_settings

  @settings.instance_eval &block if block_given?

  return @settings
end

#to_field(field_name, aLambda = nil, &block) ⇒ Object

Part of DSL, used to define an indexing mapping. Register logic to be called for each record, and generate values for a particular output field.



245
246
247
# File 'lib/traject/indexer.rb', line 245

def to_field(field_name, aLambda = nil, &block)
  @index_steps << ToFieldStep.new(field_name, aLambda, block, Traject::Util.extract_caller_location(caller.first))
end

#writer!Object

Instantiate a Traject Writer, suing class set in #writer_class



540
541
542
543
# File 'lib/traject/indexer.rb', line 540

def writer!
  writer_class = @writer_class || qualified_const_get(settings["writer_class_name"])
  writer_class.new(settings.merge("logger" => logger))
end