Method: Traject::Indexer#process_with
- Defined in:
- lib/traject/indexer.rb
#process_with(source, destination = nil, close_writer: true, rescue_with: nil, on_skipped: nil) ⇒ Object
A light-weight process method meant for programmatic use, generally intended for only a "few" (not milliions) of records.
It does not use instance-configured reader or writer, instead taking a source/reader and destination/writer as arguments to this call.
The reader can be anything that has an #each returning source records. This includes an ordinary array of source records, or any traject Reader.
The writer can be anything with a #put method taking a Traject::Indexer::Context. For convenience, see the Traject::ArrayWriter that just collects output in an array.
Return value of process_with is the writer passed as second arg, for your convenience.
This does much less than the full #process method, to be more flexible and make fewer assumptions:
- Will never use any additional threads (unless writer does). Wrap in your own threading if desired.
- Will not do any standard logging or progress bars, regardless of indexer settings. Log yourself if desired.
- Will not call any
after_processingsteps. Call yourself withindexer.run_after_processing_stepsas desired. - WILL by default call #close on the writer, IF the writer has a #close method.
pass
:close_writer => falseto not do so. - exceptions will just raise out, unless you pass in a rescue: option, value is a proc/lambda
that will receive two args, context and exception. If the rescue proc doesn't re-raise,
process_withwill continue to process subsequent records.
692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 |
# File 'lib/traject/indexer.rb', line 692 def process_with(source, destination = nil, close_writer: true, rescue_with: nil, on_skipped: nil) unless destination || block_given? raise ArgumentError, "Need either a second arg writer/destination, or a block" end settings.fill_in_defaults! position = 0 input_name = Traject::Util.io_name(source) source.each do |record | begin position += 1 context = Context.new( :source_record => record, :source_record_id_proc => source_record_id_proc, :settings => settings, :position => position, :position_in_input => (position if input_name), :logger => logger ) map_to_context!(context) if context.skip? on_skipped.call(context) if on_skipped else destination.put(context) if destination yield(context) if block_given? end rescue StandardError => e if rescue_with rescue_with.call(context, e) else raise e end end end if close_writer && destination.respond_to?(:close) destination.close end return destination end |