Method: Traject::Indexer#process_with

Defined in:
lib/traject/indexer.rb

#process_with(source, destination = nil, close_writer: true, rescue_with: nil, on_skipped: nil) ⇒ Object

A light-weight process method meant for programmatic use, generally intended for only a "few" (not milliions) of records.

It does not use instance-configured reader or writer, instead taking a source/reader and destination/writer as arguments to this call.

The reader can be anything that has an #each returning source records. This includes an ordinary array of source records, or any traject Reader.

The writer can be anything with a #put method taking a Traject::Indexer::Context. For convenience, see the Traject::ArrayWriter that just collects output in an array.

Return value of process_with is the writer passed as second arg, for your convenience.

This does much less than the full #process method, to be more flexible and make fewer assumptions:

  • Will never use any additional threads (unless writer does). Wrap in your own threading if desired.
  • Will not do any standard logging or progress bars, regardless of indexer settings. Log yourself if desired.
  • Will not call any after_processing steps. Call yourself with indexer.run_after_processing_steps as desired.
  • WILL by default call #close on the writer, IF the writer has a #close method. pass :close_writer => false to not do so.
  • exceptions will just raise out, unless you pass in a rescue: option, value is a proc/lambda that will receive two args, context and exception. If the rescue proc doesn't re-raise, process_with will continue to process subsequent records.

Examples:

array_writer_instance = indexer.process_with([record1, record2], Traject::ArrayWriter.new)

With a block, in addition to or instead of a writer.


indexer.process_with([record]) do |context|
  do_something_with(context.output_hash)
end

Parameters:

  • source (#each)
  • destination (#put) (defaults to: nil)
  • close_writer (defaults to: true)

    whether the destination should have #close called on it, if it responds to.

  • rescue_with (Proc) (defaults to: nil)

    to call on errors, taking two args: A Traject::Indexer::Context and an exception. If nil (default), exceptions will be raised out. If set, you can raise or handle otherwise if you like.

  • on_skipped (Proc) (defaults to: nil)

    will be called for any skipped records, with one arg Traject::Indexer::Context



692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
# File 'lib/traject/indexer.rb', line 692

def process_with(source, destination = nil, close_writer: true, rescue_with: nil, on_skipped: nil)
  unless destination || block_given?
    raise ArgumentError, "Need either a second arg writer/destination, or a block"
  end

  settings.fill_in_defaults!

  position = 0
  input_name = Traject::Util.io_name(source)
  source.each do |record |
    begin
      position += 1

      context = Context.new(
          :source_record          => record,
          :source_record_id_proc  => source_record_id_proc,
          :settings               => settings,
          :position               => position,
          :position_in_input      => (position if input_name),
          :logger                 => logger
      )

      map_to_context!(context)

      if context.skip?
        on_skipped.call(context) if on_skipped
      else
        destination.put(context) if destination
        yield(context) if block_given?
      end
    rescue StandardError => e
      if rescue_with
        rescue_with.call(context, e)
      else
        raise e
      end
    end
  end

  if close_writer && destination.respond_to?(:close)
    destination.close
  end

  return destination
end