Method: Traject::Indexer#process

Defined in:
lib/traject/indexer.rb

#process(io_stream_or_array) ⇒ Object

Processes a stream of records, reading from the configured Reader, mapping according to configured mapping rules, and then writing to configured Writer.

You instead give it an array of streams, as well.

returns 'false' as a signal to command line to return non-zero exit code for some reason (reason found in logs, presumably). This particular mechanism is open to complexification, starting simple. We do need SOME way to return non-zero to command line.

Parameters:

  • (#read, Array<#read>)


523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
# File 'lib/traject/indexer.rb', line 523

def process(io_stream_or_array)
  check_uncompleted

  settings.fill_in_defaults!

  count      = 0
  start_time = batch_start_time = Time.now
  logger.debug "beginning Traject::Indexer#process with settings: #{settings.inspect}"

  processing_threads = settings["processing_thread_pool"].to_i
  thread_pool        = Traject::ThreadPool.new(processing_threads)

  logger.info "   Traject::Indexer with #{processing_threads} processing threads, reader: #{reader_class.name} and writer: #{writer.class.name}"

  #io_stream can now be an array of io_streams.
  (io_stream_or_array.kind_of?(Array) ? io_stream_or_array : [io_stream_or_array]).each do |io_stream|
    reader = self.reader!(io_stream)
    input_name = Traject::Util.io_name(io_stream)
    position_in_input = 0

    log_batch_size = settings["log.batch_size"] && settings["log.batch_size"].to_i

    reader.each do |record; safe_count, safe_position_in_input |
      count    += 1
      position_in_input += 1

      # have to use a block local var, so the changing `count` one
      # doesn't get caught in the closure. Don't totally get it, but
      # I think it's so.
      safe_count, safe_position_in_input = count, position_in_input

      thread_pool.raise_collected_exception!

      if settings["debug_ascii_progress"].to_s == "true"
        $stderr.write "." if count % settings["solr_writer.batch_size"].to_i == 0
      end

      context = Context.new(
          :source_record => record,
          :source_record_id_proc => source_record_id_proc,
          :settings      => settings,
          :position      => safe_count,
          :input_name    => input_name,
          :position_in_input => safe_position_in_input,
          :logger        => logger
      )


      if log_batch_size && (count % log_batch_size == 0)
        batch_rps   = log_batch_size / (Time.now - batch_start_time)
        overall_rps = count / (Time.now - start_time)
        logger.send(settings["log.batch_size.severity"].downcase.to_sym, "Traject::Indexer#process, read #{count} records at: #{context.record_inspect}; #{'%.0f' % batch_rps}/s this batch, #{'%.0f' % overall_rps}/s overall")
        batch_start_time = Time.now
      end

      # We pass context in a block arg to properly 'capture' it, so
      # we don't accidentally share the local var under closure between
      # threads.
      thread_pool.maybe_in_thread_pool(context) do |t_context|
        map_to_context!(t_context)
        if context.skip?
          log_skip(t_context)
        else
          writer.put t_context
        end
      end
    end
  end
  $stderr.write "\n" if settings["debug_ascii_progress"].to_s == "true"

  logger.debug "Shutting down #processing mapper threadpool..."
  thread_pool.shutdown_and_wait
  logger.debug "#processing mapper threadpool shutdown complete."

  thread_pool.raise_collected_exception!

  complete

  elapsed = Time.now - start_time
  avg_rps = (count / elapsed)
  logger.info "finished Traject::Indexer#process: #{count} records in #{'%.3f' % elapsed} seconds; #{'%.1f' % avg_rps} records/second overall."

  if writer.respond_to?(:skipped_record_count) && writer.skipped_record_count > 0
    logger.error "Traject::Indexer#process returning 'false' due to #{writer.skipped_record_count} skipped records."
    return false
  end

  return true
end