Class: Traject::Indexer

Inherits:
Object
  • Object
show all
Extended by:
QualifiedConstGet
Includes:
Macros::Basic, Macros::Transformation, QualifiedConstGet
Defined in:
lib/traject/indexer.rb,
lib/traject/indexer/step.rb,
lib/traject/indexer/context.rb,
lib/traject/indexer/settings.rb,
lib/traject/indexer/marc_indexer.rb,
lib/traject/indexer/nokogiri_indexer.rb

Overview

Represents the context of a specific record being indexed, passed to indexing logic blocks

Arg source_record_id_proc is a lambda that takes one arg (indexer-specific source record), and returns an ID for it suitable for use in log messages.

Direct Known Subclasses

MarcIndexer, NokogiriIndexer

Defined Under Namespace

Classes: AfterProcessingStep, ConfigLoadError, Context, EachRecordStep, MarcIndexer, NokogiriIndexer, Settings, ToFieldStep

Constant Summary collapse

CompletedStateError =
Class.new(StandardError)
ArityError =
Class.new(ArgumentError)
NamingError =
Class.new(ArgumentError)

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from QualifiedConstGet

qualified_const_get

Methods included from Macros::Transformation

#append, #default, #first_only, #gsub, #prepend, #split, #strip, #transform, #translation_map, #unique

Methods included from Macros::Basic

#literal

Constructor Details

#initialize(arg_settings = {}, &block) ⇒ Indexer

optional hash or Traject::Indexer::Settings object of settings. optionally takes a block which is instance_eval'd in the indexer, intended for configuration simimlar to what would be in a config file.



176
177
178
179
180
181
182
183
184
# File 'lib/traject/indexer.rb', line 176

def initialize(arg_settings = {}, &block)
  @writer_class           = nil
  @completed              = false
  @settings               = Settings.new(arg_settings).with_defaults(self.class.default_settings)
  @index_steps            = []
  @after_processing_steps = []

  instance_eval(&block) if block
end

Instance Attribute Details

#loggerObject



331
332
333
# File 'lib/traject/indexer.rb', line 331

def logger
  @logger ||= create_logger
end

#reader_classObject



716
717
718
719
720
721
722
723
# File 'lib/traject/indexer.rb', line 716

def reader_class
  unless defined? @reader_class
    reader_class_name = settings["reader_class_name"]

    @reader_class = qualified_const_get(reader_class_name)
  end
  return @reader_class
end

#writerObject



741
742
743
# File 'lib/traject/indexer.rb', line 741

def writer
  @writer ||= settings["writer"] || writer!
end

#writer_classObject



725
726
727
# File 'lib/traject/indexer.rb', line 725

def writer_class
  writer.class
end

Class Method Details

.default_settingsObject

Hash is frozen to avoid inheritance-mutability confusion.



249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
# File 'lib/traject/indexer.rb', line 249

def self.default_settings
  @default_settings ||= {
      # Writer defaults
      "writer_class_name"       => "Traject::SolrJsonWriter",
      "solr_writer.batch_size"  => 100,
      "solr_writer.thread_pool" => 1,

      # Threading and logging
      "processing_thread_pool"  => Traject::Indexer::Settings.default_processing_thread_pool,
      "log.batch_size.severity" => "info",

      # how to post-process the accumulator
      "allow_nil_values"        => false,
      "allow_duplicate_values"  => true,

      "allow_empty_fields"      => false
  }.freeze
end

.legacy_marc_mode!Object



295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
# File 'lib/traject/indexer.rb', line 295

def self.legacy_marc_mode!
  @@legacy_marc_mode = true
  # include legacy Marc macros
  include Traject::Macros::Marc21

  # Reader defaults
  legacy_settings = {
    "reader_class_name"       => "Traject::MarcReader",
    "marc_source.type"        => "binary",
  }

  default_settings.merge!(legacy_settings)

  self
end

Instance Method Details

#after_processing(aLambda = nil, &block) ⇒ Object

Part of DSL, register logic to be called once at the end of processing a stream of records.



327
328
329
# File 'lib/traject/indexer.rb', line 327

def after_processing(aLambda = nil, &block)
  @after_processing_steps << AfterProcessingStep.new(aLambda, block, Traject::Util.extract_caller_location(caller.first))
end

#completeObject

Closes the writer (which may flush/save/finalize buffered records), and calls run_after_processing_steps



601
602
603
604
605
606
607
608
# File 'lib/traject/indexer.rb', line 601

def complete
  writer.close if writer.respond_to?(:close)
  run_after_processing_steps

  # after an indexer has been completed, it is not really usable anymore,
  # as the writer has been closed.
  @completed = true
end

#completed?Boolean

Returns:

  • (Boolean)


585
586
587
# File 'lib/traject/indexer.rb', line 585

def completed?
  @completed
end

#configure(&block) ⇒ Object

Right now just does an instance_eval, but encouraged in case we change the underlying implementation later, and to make intent more clear.



188
189
190
# File 'lib/traject/indexer.rb', line 188

def configure(&block)
  instance_eval(&block)
end

#create_loggerObject

Create logger according to settings



351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
# File 'lib/traject/indexer.rb', line 351

def create_logger

  logger_level  = settings["log.level"] || "info"

  # log everything to STDERR or specified logfile
  logger        = Yell::Logger.new(:null)
  logger.format = logger_format
  logger.level  = logger_level

  logger_destination = settings["log.file"] || "STDERR"
  # We intentionally repeat the logger_level
  # on the adapter, so it will stay there if overall level
  # is changed.
  case logger_destination
    when "STDERR"
      logger.adapter :stderr, level: logger_level, format: logger_format
    when "STDOUT"
      logger.adapter :stdout, level: logger_level, format: logger_format
    else
      logger.adapter :file, logger_destination, level: logger_level, format: logger_format
  end


  # ADDITIONALLY log error and higher to....
  if settings["log.error_file"]
    logger.adapter :file, settings["log.error_file"], :level => 'gte.error'
  end

  return logger
end

#each_record(aLambda = nil, &block) ⇒ Object

Part of DSL, register logic to be called for each record



321
322
323
# File 'lib/traject/indexer.rb', line 321

def each_record(aLambda = nil, &block)
  @index_steps << EachRecordStep.new(aLambda, block, Traject::Util.extract_caller_location(caller.first))
end

#load_config_file(file_path) ⇒ Object

Pass a string file path, a Pathname, or a File object, for a config file to load into indexer.

Can raise:

  • Errno::ENOENT or Errno::EACCES if file path is not accessible
  • Traject::Indexer::ConfigLoadError if exception is raised evaluating the config. A ConfigLoadError has information in it about original exception, and exactly what config file and line number triggered it.


201
202
203
204
205
206
207
208
209
# File 'lib/traject/indexer.rb', line 201

def load_config_file(file_path)
  File.open(file_path) do |file|
    begin
      self.instance_eval(file.read, file_path.to_s)
    rescue ScriptError, StandardError => e
      raise ConfigLoadError.new(file_path.to_s, e)
    end
  end
end

#log_skip(context) ⇒ Object

Log that the current record is being skipped, using data in context.position and context.skipmessage



712
713
714
# File 'lib/traject/indexer.rb', line 712

def log_skip(context)
  logger.debug "Skipped record #{context.record_inspect}: #{context.skipmessage}"
end

#logger_formatObject



338
339
340
341
342
343
344
345
346
347
348
# File 'lib/traject/indexer.rb', line 338

def logger_format
  format = settings["log.format"] || "%d %5L %m"
  format = case format
             when "false" then
               false
             when "" then
               nil
             else
               format
           end
end

#map_record(record) ⇒ Object

Processes a single record according to indexing rules set up in this indexer. Returns the output hash (a hash whose keys are string fields, and values are arrays of one or more values in that field)

If the record is marked skip as part of processing, this will return nil.

This is a convenience shortcut for #map_to_context! -- use that one if you want to provide addtional context like position, and/or get back the full context.



393
394
395
396
397
# File 'lib/traject/indexer.rb', line 393

def map_record(record)
  context = Context.new(:source_record => record, :settings => settings, :source_record_id_proc => source_record_id_proc, :logger => logger)
  map_to_context!(context)
  return context.output_hash unless context.skip?
end

#map_to_context!(context) ⇒ Object

Maps a single record INTO the second argument, a Traject::Indexer::Context.

Context must be passed with a #source_record and #settings, and optionally a #position.

Context will be mutated by this method, most significantly by adding an #output_hash, a hash from fieldname to array of values in that field.

Pass in a context with a set #position if you want that to be available to mapping routines.

Returns the context passed in as second arg, as a convenience for chaining etc.



428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
# File 'lib/traject/indexer.rb', line 428

def map_to_context!(context)
  @index_steps.each do |index_step|
    # Don't bother if we're skipping this record
    break if context.skip?

    # Set the index step for error reporting
    context.index_step = index_step
    handle_mapping_errors(context) do
      index_step.execute(context) # will always return [] for an each_record step
    end

    # And unset the index step now that we're finished
    context.index_step = nil
  end

  return context
end

#process(io_stream_or_array) ⇒ Object

Processes a stream of records, reading from the configured Reader, mapping according to configured mapping rules, and then writing to configured Writer.

You instead give it an array of streams, as well.

returns 'false' as a signal to command line to return non-zero exit code for some reason (reason found in logs, presumably). This particular mechanism is open to complexification, starting simple. We do need SOME way to return non-zero to command line.

Parameters:

  • (#read, Array<#read>)


495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
# File 'lib/traject/indexer.rb', line 495

def process(io_stream_or_array)
  check_uncompleted

  settings.fill_in_defaults!

  count      = 0
  start_time = batch_start_time = Time.now
  logger.debug "beginning Traject::Indexer#process with settings: #{settings.inspect}"

  processing_threads = settings["processing_thread_pool"].to_i
  thread_pool        = Traject::ThreadPool.new(processing_threads)

  logger.info "   Traject::Indexer with #{processing_threads} processing threads, reader: #{reader_class.name} and writer: #{writer.class.name}"

  #io_stream can now be an array of io_streams.
  (io_stream_or_array.kind_of?(Array) ? io_stream_or_array : [io_stream_or_array]).each do |io_stream|
    reader = self.reader!(io_stream)
    input_name = Traject::Util.io_name(io_stream)
    position_in_input = 0

    log_batch_size = settings["log.batch_size"] && settings["log.batch_size"].to_i

    reader.each do |record; safe_count, safe_position_in_input |
      count    += 1
      position_in_input += 1

      # have to use a block local var, so the changing `count` one
      # doesn't get caught in the closure. Don't totally get it, but
      # I think it's so.
      safe_count, safe_position_in_input = count, position_in_input

      thread_pool.raise_collected_exception!

      if settings["debug_ascii_progress"].to_s == "true"
        $stderr.write "." if count % settings["solr_writer.batch_size"].to_i == 0
      end

      context = Context.new(
          :source_record => record,
          :source_record_id_proc => source_record_id_proc,
          :settings      => settings,
          :position      => safe_count,
          :input_name    => input_name,
          :position_in_input => safe_position_in_input,
          :logger        => logger
      )


      if log_batch_size && (count % log_batch_size == 0)
        batch_rps   = log_batch_size / (Time.now - batch_start_time)
        overall_rps = count / (Time.now - start_time)
        logger.send(settings["log.batch_size.severity"].downcase.to_sym, "Traject::Indexer#process, read #{count} records at: #{context.record_inspect}; #{'%.0f' % batch_rps}/s this batch, #{'%.0f' % overall_rps}/s overall")
        batch_start_time = Time.now
      end

      # We pass context in a block arg to properly 'capture' it, so
      # we don't accidentally share the local var under closure between
      # threads.
      thread_pool.maybe_in_thread_pool(context) do |t_context|
        map_to_context!(t_context)
        if context.skip?
          log_skip(t_context)
        else
          writer.put t_context
        end
      end
    end
  end
  $stderr.write "\n" if settings["debug_ascii_progress"].to_s == "true"

  logger.debug "Shutting down #processing mapper threadpool..."
  thread_pool.shutdown_and_wait
  logger.debug "#processing mapper threadpool shutdown complete."

  thread_pool.raise_collected_exception!

  complete

  elapsed = Time.now - start_time
  avg_rps = (count / elapsed)
  logger.info "finished Traject::Indexer#process: #{count} records in #{'%.3f' % elapsed} seconds; #{'%.1f' % avg_rps} records/second overall."

  if writer.respond_to?(:skipped_record_count) && writer.skipped_record_count > 0
    logger.error "Traject::Indexer#process returning 'false' due to #{writer.skipped_record_count} skipped records."
    return false
  end

  return true
end

#process_record(record) ⇒ Object Also known as: <<

Takes a single record, maps it, and sends it to the instance-configured writer. No threading, no logging, no error handling. Respects skipped records by not adding them. Returns the Traject::Indexer::Context.

Aliased as #<<



404
405
406
407
408
409
410
411
412
# File 'lib/traject/indexer.rb', line 404

def process_record(record)
  check_uncompleted

  context = Context.new(:source_record => record, :settings => settings, :source_record_id_proc =>  source_record_id_proc, :logger => logger)
  map_to_context!(context)
  writer.put( context ) unless context.skip?

  return context
end

#process_with(source, destination = nil, close_writer: true, rescue_with: nil, on_skipped: nil) ⇒ Object

A light-weight process method meant for programmatic use, generally intended for only a "few" (not milliions) of records.

It does not use instance-configured reader or writer, instead taking a source/reader and destination/writer as arguments to this call.

The reader can be anything that has an #each returning source records. This includes an ordinary array of source records, or any traject Reader.

The writer can be anything with a #put method taking a Traject::Indexer::Context. For convenience, see the Traject::ArrayWriter that just collects output in an array.

Return value of process_with is the writer passed as second arg, for your convenience.

This does much less than the full #process method, to be more flexible and make fewer assumptions:

  • Will never use any additional threads (unless writer does). Wrap in your own threading if desired.
  • Will not do any standard logging or progress bars, regardless of indexer settings. Log yourself if desired.
  • Will not call any after_processing steps. Call yourself with indexer.run_after_processing_steps as desired.
  • WILL by default call #close on the writer, IF the writer has a #close method. pass :close_writer => false to not do so.
  • exceptions will just raise out, unless you pass in a rescue: option, value is a proc/lambda that will receive two args, context and exception. If the rescue proc doesn't re-raise, process_with will continue to process subsequent records.

Examples:

array_writer_instance = indexer.process_with([record1, record2], Traject::ArrayWriter.new)

With a block, in addition to or instead of a writer.


indexer.process_with([record]) do |context|
  do_something_with(context.output_hash)
end

Parameters:

  • source (#each)
  • destination (#put) (defaults to: nil)
  • close_writer (defaults to: true)

    whether the destination should have #close called on it, if it responds to.

  • rescue_with (Proc) (defaults to: nil)

    to call on errors, taking two args: A Traject::Indexer::Context and an exception. If nil (default), exceptions will be raised out. If set, you can raise or handle otherwise if you like.

  • on_skipped (Proc) (defaults to: nil)

    will be called for any skipped records, with one arg Traject::Indexer::Context



664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
# File 'lib/traject/indexer.rb', line 664

def process_with(source, destination = nil, close_writer: true, rescue_with: nil, on_skipped: nil)
  unless destination || block_given?
    raise ArgumentError, "Need either a second arg writer/destination, or a block"
  end

  settings.fill_in_defaults!

  position = 0
  input_name = Traject::Util.io_name(source)
  source.each do |record |
    begin
      position += 1

      context = Context.new(
          :source_record          => record,
          :source_record_id_proc  => source_record_id_proc,
          :settings               => settings,
          :position               => position,
          :position_in_input      => (position if input_name),
          :logger                 => logger
      )

      map_to_context!(context)

      if context.skip?
        on_skipped.call(context) if on_skipped
      else
        destination.put(context) if destination
        yield(context) if block_given?
      end
    rescue StandardError => e
      if rescue_with
        rescue_with.call(context, e)
      else
        raise e
      end
    end
  end

  if close_writer && destination.respond_to?(:close)
    destination.close
  end

  return destination
end

#reader!(io_stream) ⇒ Object

Instantiate a Traject Reader, using class set in #reader_class, initialized with io_stream passed in



731
732
733
# File 'lib/traject/indexer.rb', line 731

def reader!(io_stream)
  return reader_class.new(io_stream, settings.merge("logger" => logger))
end

#run_after_processing_stepsObject



610
611
612
613
614
615
616
617
618
619
# File 'lib/traject/indexer.rb', line 610

def run_after_processing_steps
  @after_processing_steps.each do |step|
    begin
      step.execute
    rescue StandardError => e
      logger.fatal("Unexpected exception #{e} when executing #{step}")
      raise e
    end
  end
end

#settings(new_settings = nil, &block) ⇒ Object

Part of the config file DSL, for writing settings values.

The Indexer's settings consist of a hash-like Traject::Settings object. The settings hash is not nested hashes, just one level of configuration settings. Keys are always strings, and by convention use "." for namespacing, eg log.file

The settings method with no arguments returns that Settings object.

With a hash and/or block argument, can be used to set new key/values. Each call merges onto the existing settings hash. The block is instance_evald in the context of the Traject::Settings object.

indexer.settings("a" => "a", "b" => "b")

indexer.settings do provide "b", "new b" end

indexer.settings #=> => "a", "b" => "new b"

Note the #provide method is defined on Traject::Settings to write to a setting only if previously not set. You can also use #store to force over-writing even if an existing setting.

Even with arguments, Indexer#settings returns the Settings object, hash too, so can method calls can be chained.



240
241
242
243
244
245
246
# File 'lib/traject/indexer.rb', line 240

def settings(new_settings = nil, &block)
  @settings.merge!(new_settings) if new_settings

  @settings.instance_eval(&block) if block_given?

  return @settings
end

#source_record_id_procObject

Sub-classes should override to return a proc object that takes one arg, a source record, and returns an identifier for it that can be used in logged messages. This differs depending on input record format, is why we leave it to sub-classes.



281
282
283
284
285
286
287
288
289
290
291
292
293
# File 'lib/traject/indexer.rb', line 281

def source_record_id_proc
  if defined?(@@legacy_marc_mode) && @@legacy_marc_mode
    return @source_record_id_proc ||= lambda do |source_marc_record|
      if ( source_marc_record &&
           source_marc_record.kind_of?(MARC::Record) &&
           source_marc_record['001'] )
        source_marc_record['001'].value
      end
    end
  end

  @source_record_id_proc ||= lambda { |source| nil }
end

#to_field(field_name, *procs, &block) ⇒ Object

Part of DSL, used to define an indexing mapping. Register logic to be called for each record, and generate values for a particular output field. The first field_name argument can be a single string, or an array of multiple strings -- in the latter case, the processed values will be added to each field mentioned.



316
317
318
# File 'lib/traject/indexer.rb', line 316

def to_field(field_name, *procs, &block)
  @index_steps << ToFieldStep.new(field_name, procs, block, Traject::Util.extract_caller_location(caller.first))
end

#writer!Object

Instantiate a Traject Writer, suing class set in #writer_class



736
737
738
739
# File 'lib/traject/indexer.rb', line 736

def writer!
  writer_class = @writer_class || qualified_const_get(settings["writer_class_name"])
  writer_class.new(settings.merge("logger" => logger))
end