Class: Traject::Indexer

Inherits:
Object
  • Object
show all
Extended by:
QualifiedConstGet
Includes:
Macros::Basic, Macros::Transformation, QualifiedConstGet
Defined in:
lib/traject/indexer.rb,
lib/traject/indexer/step.rb,
lib/traject/indexer/context.rb,
lib/traject/indexer/settings.rb,
lib/traject/indexer/marc_indexer.rb,
lib/traject/indexer/nokogiri_indexer.rb

Overview

Represents the context of a specific record being indexed, passed to indexing logic blocks

Arg source_record_id_proc is a lambda that takes one arg (indexer-specific source record), and returns an ID for it suitable for use in log messages.

Direct Known Subclasses

MarcIndexer, NokogiriIndexer

Defined Under Namespace

Classes: AfterProcessingStep, ConfigLoadError, Context, EachRecordStep, MarcIndexer, NokogiriIndexer, Settings, ToFieldStep

Constant Summary collapse

CompletedStateError =
Class.new(StandardError)
ArityError =
Class.new(ArgumentError)
NamingError =
Class.new(ArgumentError)

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from QualifiedConstGet

qualified_const_get

Methods included from Macros::Transformation

#append, #default, #first_only, #gsub, #prepend, #split, #strip, #transform, #translation_map, #unique

Methods included from Macros::Basic

#literal

Constructor Details

#initialize(arg_settings = {}, &block) ⇒ Indexer

optional hash or Traject::Indexer::Settings object of settings. optionally takes a block which is instance_eval'd in the indexer, intended for configuration simimlar to what would be in a config file.



176
177
178
179
180
181
182
183
184
185
# File 'lib/traject/indexer.rb', line 176

def initialize(arg_settings = {}, &block)
  @writer_class           = nil
  @completed              = false
  @settings               = Settings.new(arg_settings).with_defaults(self.class.default_settings)
  @index_steps            = []
  @after_processing_steps = []

  self.class.apply_class_configure_block(self)
  instance_eval(&block) if block
end

Instance Attribute Details

#loggerObject



355
356
357
# File 'lib/traject/indexer.rb', line 355

def logger
  @logger ||= create_logger
end

#reader_classObject



744
745
746
747
748
749
750
751
# File 'lib/traject/indexer.rb', line 744

def reader_class
  unless defined? @reader_class
    reader_class_name = settings["reader_class_name"]

    @reader_class = qualified_const_get(reader_class_name)
  end
  return @reader_class
end

#writerObject



769
770
771
# File 'lib/traject/indexer.rb', line 769

def writer
  @writer ||= settings["writer"] || writer!
end

#writer_classObject



753
754
755
# File 'lib/traject/indexer.rb', line 753

def writer_class
  writer.class
end

Class Method Details

.apply_class_configure_block(instance) ⇒ Object



206
207
208
209
210
211
212
213
214
# File 'lib/traject/indexer.rb', line 206

def self.apply_class_configure_block(instance)
  # Make sure we inherit from superclass that has a class-level ivar @class_configure_block
  if self.superclass.respond_to?(:apply_class_configure_block)
    self.superclass.apply_class_configure_block(instance)
  end
  if @class_configure_block
    instance.configure(&@class_configure_block)
  end
end

.configure(&block) ⇒ Object

Class level configure block accepted too, and applied at instantiation before instance-level configuration.

EXPERIMENTAL, implementation may change in ways that effect some uses. https://github.com/traject/traject/pull/213

Note that settings set by 'provide' in subclass can not really be overridden by 'provide' in a next level subclass. Use self.default_settings instead, with call to super.



202
203
204
# File 'lib/traject/indexer.rb', line 202

def self.configure(&block)
  @class_configure_block = block
end

.default_settingsObject

Hash is frozen to avoid inheritance-mutability confusion.



274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
# File 'lib/traject/indexer.rb', line 274

def self.default_settings
  @default_settings ||= {
      # Writer defaults
      "writer_class_name"       => "Traject::SolrJsonWriter",
      "solr_writer.batch_size"  => 100,
      "solr_writer.thread_pool" => 1,

      # Threading and logging
      "processing_thread_pool"  => Traject::Indexer::Settings.default_processing_thread_pool,
      "log.batch_size.severity" => "info",

      # how to post-process the accumulator
      Traject::Indexer::ToFieldStep::ALLOW_NIL_VALUES => false,
      Traject::Indexer::ToFieldStep::ALLOW_DUPLICATE_VALUES  => true,
      Traject::Indexer::ToFieldStep::ALLOW_EMPTY_FIELDS => false
  }.freeze
end

.legacy_marc_mode!Object



319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
# File 'lib/traject/indexer.rb', line 319

def self.legacy_marc_mode!
  @@legacy_marc_mode = true
  # include legacy Marc macros
  include Traject::Macros::Marc21

  # Reader defaults
  legacy_settings = {
    "reader_class_name"       => "Traject::MarcReader",
    "marc_source.type"        => "binary",
  }

  default_settings.merge!(legacy_settings)

  self
end

Instance Method Details

#after_processing(aLambda = nil, &block) ⇒ Object

Part of DSL, register logic to be called once at the end of processing a stream of records.



351
352
353
# File 'lib/traject/indexer.rb', line 351

def after_processing(aLambda = nil, &block)
  @after_processing_steps << AfterProcessingStep.new(aLambda, block, Traject::Util.extract_caller_location(caller.first))
end

#completeObject

Closes the writer (which may flush/save/finalize buffered records), and calls run_after_processing_steps



629
630
631
632
633
634
635
636
# File 'lib/traject/indexer.rb', line 629

def complete
  writer.close if writer.respond_to?(:close)
  run_after_processing_steps

  # after an indexer has been completed, it is not really usable anymore,
  # as the writer has been closed.
  @completed = true
end

#completed?Boolean

Returns:

  • (Boolean)


613
614
615
# File 'lib/traject/indexer.rb', line 613

def completed?
  @completed
end

#configure(&block) ⇒ Object

Right now just does an instance_eval, but encouraged in case we change the underlying implementation later, and to make intent more clear.



189
190
191
# File 'lib/traject/indexer.rb', line 189

def configure(&block)
  instance_eval(&block)
end

#create_loggerObject

Create logger according to settings



375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
# File 'lib/traject/indexer.rb', line 375

def create_logger
  if settings["logger"]
    # none of the other settings matter, we just got a logger
    return settings["logger"]
  end

  logger_level  = settings["log.level"] || "info"

  # log everything to STDERR or specified logfile
  logger        = Yell::Logger.new(:null)
  logger.format = logger_format
  logger.level  = logger_level

  logger_destination = settings["log.file"] || "STDERR"
  # We intentionally repeat the logger_level
  # on the adapter, so it will stay there if overall level
  # is changed.
  case logger_destination
    when "STDERR"
      logger.adapter :stderr, level: logger_level, format: logger_format
    when "STDOUT"
      logger.adapter :stdout, level: logger_level, format: logger_format
    else
      logger.adapter :file, logger_destination, level: logger_level, format: logger_format
  end


  # ADDITIONALLY log error and higher to....
  if settings["log.error_file"]
    logger.adapter :file, settings["log.error_file"], :level => 'gte.error'
  end

  return logger
end

#each_record(aLambda = nil, &block) ⇒ Object

Part of DSL, register logic to be called for each record



345
346
347
# File 'lib/traject/indexer.rb', line 345

def each_record(aLambda = nil, &block)
  @index_steps << EachRecordStep.new(aLambda, block, Traject::Util.extract_caller_location(caller.first))
end

#load_config_file(file_path) ⇒ Object

Pass a string file path, a Pathname, or a File object, for a config file to load into indexer.

Can raise:

  • Errno::ENOENT or Errno::EACCES if file path is not accessible
  • Traject::Indexer::ConfigLoadError if exception is raised evaluating the config. A ConfigLoadError has information in it about original exception, and exactly what config file and line number triggered it.


226
227
228
229
230
231
232
233
234
# File 'lib/traject/indexer.rb', line 226

def load_config_file(file_path)
  File.open(file_path) do |file|
    begin
      self.instance_eval(file.read, file_path.to_s)
    rescue ScriptError, StandardError => e
      raise ConfigLoadError.new(file_path.to_s, e)
    end
  end
end

#log_skip(context) ⇒ Object

Log that the current record is being skipped, using data in context.position and context.skipmessage



740
741
742
# File 'lib/traject/indexer.rb', line 740

def log_skip(context)
  logger.debug "Skipped record #{context.record_inspect}: #{context.skipmessage}"
end

#logger_formatObject



362
363
364
365
366
367
368
369
370
371
372
# File 'lib/traject/indexer.rb', line 362

def logger_format
  format = settings["log.format"] || "%d %5L %m"
  format = case format
             when "false" then
               false
             when "" then
               nil
             else
               format
           end
end

#map_record(record) ⇒ Object

Processes a single record according to indexing rules set up in this indexer. Returns the output hash (a hash whose keys are string fields, and values are arrays of one or more values in that field)

If the record is marked skip as part of processing, this will return nil.

This is a convenience shortcut for #map_to_context! -- use that one if you want to provide addtional context like position, and/or get back the full context.



421
422
423
424
425
# File 'lib/traject/indexer.rb', line 421

def map_record(record)
  context = Context.new(:source_record => record, :settings => settings, :source_record_id_proc => source_record_id_proc, :logger => logger)
  map_to_context!(context)
  return context.output_hash unless context.skip?
end

#map_to_context!(context) ⇒ Object

Maps a single record INTO the second argument, a Traject::Indexer::Context.

Context must be passed with a #source_record and #settings, and optionally a #position.

Context will be mutated by this method, most significantly by adding an #output_hash, a hash from fieldname to array of values in that field.

Pass in a context with a set #position if you want that to be available to mapping routines.

Returns the context passed in as second arg, as a convenience for chaining etc.



456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
# File 'lib/traject/indexer.rb', line 456

def map_to_context!(context)
  @index_steps.each do |index_step|
    # Don't bother if we're skipping this record
    break if context.skip?

    # Set the index step for error reporting
    context.index_step = index_step
    handle_mapping_errors(context) do
      index_step.execute(context) # will always return [] for an each_record step
    end

    # And unset the index step now that we're finished
    context.index_step = nil
  end

  return context
end

#process(io_stream_or_array) ⇒ Object

Processes a stream of records, reading from the configured Reader, mapping according to configured mapping rules, and then writing to configured Writer.

You instead give it an array of streams, as well.

returns 'false' as a signal to command line to return non-zero exit code for some reason (reason found in logs, presumably). This particular mechanism is open to complexification, starting simple. We do need SOME way to return non-zero to command line.

Parameters:

  • (#read, Array<#read>)


523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
# File 'lib/traject/indexer.rb', line 523

def process(io_stream_or_array)
  check_uncompleted

  settings.fill_in_defaults!

  count      = 0
  start_time = batch_start_time = Time.now
  logger.debug "beginning Traject::Indexer#process with settings: #{settings.inspect}"

  processing_threads = settings["processing_thread_pool"].to_i
  thread_pool        = Traject::ThreadPool.new(processing_threads)

  logger.info "   Traject::Indexer with #{processing_threads} processing threads, reader: #{reader_class.name} and writer: #{writer.class.name}"

  #io_stream can now be an array of io_streams.
  (io_stream_or_array.kind_of?(Array) ? io_stream_or_array : [io_stream_or_array]).each do |io_stream|
    reader = self.reader!(io_stream)
    input_name = Traject::Util.io_name(io_stream)
    position_in_input = 0

    log_batch_size = settings["log.batch_size"] && settings["log.batch_size"].to_i

    reader.each do |record; safe_count, safe_position_in_input |
      count    += 1
      position_in_input += 1

      # have to use a block local var, so the changing `count` one
      # doesn't get caught in the closure. Don't totally get it, but
      # I think it's so.
      safe_count, safe_position_in_input = count, position_in_input

      thread_pool.raise_collected_exception!

      if settings["debug_ascii_progress"].to_s == "true"
        $stderr.write "." if count % settings["solr_writer.batch_size"].to_i == 0
      end

      context = Context.new(
          :source_record => record,
          :source_record_id_proc => source_record_id_proc,
          :settings      => settings,
          :position      => safe_count,
          :input_name    => input_name,
          :position_in_input => safe_position_in_input,
          :logger        => logger
      )


      if log_batch_size && (count % log_batch_size == 0)
        batch_rps   = log_batch_size / (Time.now - batch_start_time)
        overall_rps = count / (Time.now - start_time)
        logger.send(settings["log.batch_size.severity"].downcase.to_sym, "Traject::Indexer#process, read #{count} records at: #{context.record_inspect}; #{'%.0f' % batch_rps}/s this batch, #{'%.0f' % overall_rps}/s overall")
        batch_start_time = Time.now
      end

      # We pass context in a block arg to properly 'capture' it, so
      # we don't accidentally share the local var under closure between
      # threads.
      thread_pool.maybe_in_thread_pool(context) do |t_context|
        map_to_context!(t_context)
        if context.skip?
          log_skip(t_context)
        else
          writer.put t_context
        end
      end
    end
  end
  $stderr.write "\n" if settings["debug_ascii_progress"].to_s == "true"

  logger.debug "Shutting down #processing mapper threadpool..."
  thread_pool.shutdown_and_wait
  logger.debug "#processing mapper threadpool shutdown complete."

  thread_pool.raise_collected_exception!

  complete

  elapsed = Time.now - start_time
  avg_rps = (count / elapsed)
  logger.info "finished Traject::Indexer#process: #{count} records in #{'%.3f' % elapsed} seconds; #{'%.1f' % avg_rps} records/second overall."

  if writer.respond_to?(:skipped_record_count) && writer.skipped_record_count > 0
    logger.error "Traject::Indexer#process returning 'false' due to #{writer.skipped_record_count} skipped records."
    return false
  end

  return true
end

#process_record(record) ⇒ Object Also known as: <<

Takes a single record, maps it, and sends it to the instance-configured writer. No threading, no logging, no error handling. Respects skipped records by not adding them. Returns the Traject::Indexer::Context.

Aliased as #<<



432
433
434
435
436
437
438
439
440
# File 'lib/traject/indexer.rb', line 432

def process_record(record)
  check_uncompleted

  context = Context.new(:source_record => record, :settings => settings, :source_record_id_proc =>  source_record_id_proc, :logger => logger)
  map_to_context!(context)
  writer.put( context ) unless context.skip?

  return context
end

#process_with(source, destination = nil, close_writer: true, rescue_with: nil, on_skipped: nil) ⇒ Object

A light-weight process method meant for programmatic use, generally intended for only a "few" (not milliions) of records.

It does not use instance-configured reader or writer, instead taking a source/reader and destination/writer as arguments to this call.

The reader can be anything that has an #each returning source records. This includes an ordinary array of source records, or any traject Reader.

The writer can be anything with a #put method taking a Traject::Indexer::Context. For convenience, see the Traject::ArrayWriter that just collects output in an array.

Return value of process_with is the writer passed as second arg, for your convenience.

This does much less than the full #process method, to be more flexible and make fewer assumptions:

  • Will never use any additional threads (unless writer does). Wrap in your own threading if desired.
  • Will not do any standard logging or progress bars, regardless of indexer settings. Log yourself if desired.
  • Will not call any after_processing steps. Call yourself with indexer.run_after_processing_steps as desired.
  • WILL by default call #close on the writer, IF the writer has a #close method. pass :close_writer => false to not do so.
  • exceptions will just raise out, unless you pass in a rescue: option, value is a proc/lambda that will receive two args, context and exception. If the rescue proc doesn't re-raise, process_with will continue to process subsequent records.

Examples:

array_writer_instance = indexer.process_with([record1, record2], Traject::ArrayWriter.new)

With a block, in addition to or instead of a writer.


indexer.process_with([record]) do |context|
  do_something_with(context.output_hash)
end

Parameters:

  • source (#each)
  • destination (#put) (defaults to: nil)
  • close_writer (defaults to: true)

    whether the destination should have #close called on it, if it responds to.

  • rescue_with (Proc) (defaults to: nil)

    to call on errors, taking two args: A Traject::Indexer::Context and an exception. If nil (default), exceptions will be raised out. If set, you can raise or handle otherwise if you like.

  • on_skipped (Proc) (defaults to: nil)

    will be called for any skipped records, with one arg Traject::Indexer::Context



692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
# File 'lib/traject/indexer.rb', line 692

def process_with(source, destination = nil, close_writer: true, rescue_with: nil, on_skipped: nil)
  unless destination || block_given?
    raise ArgumentError, "Need either a second arg writer/destination, or a block"
  end

  settings.fill_in_defaults!

  position = 0
  input_name = Traject::Util.io_name(source)
  source.each do |record |
    begin
      position += 1

      context = Context.new(
          :source_record          => record,
          :source_record_id_proc  => source_record_id_proc,
          :settings               => settings,
          :position               => position,
          :position_in_input      => (position if input_name),
          :logger                 => logger
      )

      map_to_context!(context)

      if context.skip?
        on_skipped.call(context) if on_skipped
      else
        destination.put(context) if destination
        yield(context) if block_given?
      end
    rescue StandardError => e
      if rescue_with
        rescue_with.call(context, e)
      else
        raise e
      end
    end
  end

  if close_writer && destination.respond_to?(:close)
    destination.close
  end

  return destination
end

#reader!(io_stream) ⇒ Object

Instantiate a Traject Reader, using class set in #reader_class, initialized with io_stream passed in



759
760
761
# File 'lib/traject/indexer.rb', line 759

def reader!(io_stream)
  return reader_class.new(io_stream, settings.merge("logger" => logger))
end

#run_after_processing_stepsObject



638
639
640
641
642
643
644
645
646
647
# File 'lib/traject/indexer.rb', line 638

def run_after_processing_steps
  @after_processing_steps.each do |step|
    begin
      step.execute
    rescue StandardError => e
      logger.fatal("Unexpected exception #{e} when executing #{step}")
      raise e
    end
  end
end

#settings(new_settings = nil, &block) ⇒ Object

Part of the config file DSL, for writing settings values.

The Indexer's settings consist of a hash-like Traject::Settings object. The settings hash is not nested hashes, just one level of configuration settings. Keys are always strings, and by convention use "." for namespacing, eg log.file

The settings method with no arguments returns that Settings object.

With a hash and/or block argument, can be used to set new key/values. Each call merges onto the existing settings hash. The block is instance_evald in the context of the Traject::Settings object.

indexer.settings("a" => "a", "b" => "b")

indexer.settings do provide "b", "new b" end

indexer.settings #=> => "a", "b" => "new b"

Note the #provide method is defined on Traject::Settings to write to a setting only if previously not set. You can also use #store to force over-writing even if an existing setting.

Even with arguments, Indexer#settings returns the Settings object, hash too, so can method calls can be chained.



265
266
267
268
269
270
271
# File 'lib/traject/indexer.rb', line 265

def settings(new_settings = nil, &block)
  @settings.merge!(new_settings) if new_settings

  @settings.instance_eval(&block) if block_given?

  return @settings
end

#source_record_id_procObject

Sub-classes should override to return a proc object that takes one arg, a source record, and returns an identifier for it that can be used in logged messages. This differs depending on input record format, is why we leave it to sub-classes.



305
306
307
308
309
310
311
312
313
314
315
316
317
# File 'lib/traject/indexer.rb', line 305

def source_record_id_proc
  if defined?(@@legacy_marc_mode) && @@legacy_marc_mode
    return @source_record_id_proc ||= lambda do |source_marc_record|
      if ( source_marc_record &&
           source_marc_record.kind_of?(MARC::Record) &&
           source_marc_record['001'] )
        source_marc_record['001'].value
      end
    end
  end

  @source_record_id_proc ||= lambda { |source| nil }
end

#to_field(field_name, *procs, &block) ⇒ Object

Part of DSL, used to define an indexing mapping. Register logic to be called for each record, and generate values for a particular output field. The first field_name argument can be a single string, or an array of multiple strings -- in the latter case, the processed values will be added to each field mentioned.



340
341
342
# File 'lib/traject/indexer.rb', line 340

def to_field(field_name, *procs, &block)
  @index_steps << ToFieldStep.new(field_name, procs, block, Traject::Util.extract_caller_location(caller.first))
end

#writer!Object

Instantiate a Traject Writer, suing class set in #writer_class



764
765
766
767
# File 'lib/traject/indexer.rb', line 764

def writer!
  writer_class = @writer_class || qualified_const_get(settings["writer_class_name"])
  writer_class.new(settings.merge("logger" => logger))
end