Class: Traject::Indexer

Inherits:
Object
  • Object
show all
Includes:
Macros::Basic, Macros::Marc21, QualifiedConstGet
Defined in:
lib/traject/indexer.rb,
lib/traject/indexer/settings.rb

Overview

This class does indexing for traject: Getting input records from a Reader class, mapping the input records to an output hash, and then sending the output hash off somewhere (usually Solr) with a Writer class.

Traject config files are instance_evald in an Indexer object, so self in a config file is an Indexer, and any Indexer methods can be called.

However, certain Indexer methods exist mainly for the purpose of being called in config files; these methods are part of the expected Domain-Specific Language ("DSL") for config files, and will ordinarily form the bulk or entirety of config files:

  • #settings
  • #to_field
  • #each_record
  • #after_procesing
  • #logger (rarely used in config files, but in some cases to set up custom logging config)

## Readers and Writers

The Indexer has a modularized architecture for readers and writers, for where source records come from (reader), and where output is sent to (writer).

A Reader is any class that: 1) Has a two-argument initializer taking an IO stream and a Settings hash 2) Responds to the usual ruby #each, returning a source record from each #each. (Including Enumerable is prob a good idea too)

The default reader is the Traject::MarcReader, who's behavior is further customized by several settings in the Settings hash. Jruby users with specialized needs may want to look at the gem traject-marc4j_reader.

Alternate readers can be set directly with the #reader_class= method, or with the "reader_class_name" Setting, a String name of a class meeting the reader contract.

A Writer is any class that: 1) Has a one-argument initializer taking a Settings hash. (The logger is provided to the Writer in settings["logger"]) 2) Responds to a one argument #put method, where the argument is a Traject::Indexer::Context, containing an #output_hash hash of mapped keys/values. The writer should write them to the appropriate place. 3) Responds to a #close method, called when we're done. 4) Optionally implements a #skipped_record_count method, returning int count of records that were skipped due to errors (and presumably logged)

Traject packages one solr writer: traject/solr_json_writer, which sends in json format and works under both ruby and jruby, but only with solr version

= 3.2. To index to an older solr installation, you'll need to use jruby and install the gem traject-solrj_writer, which uses the solrj .jar underneath.

You can set alternate writers by setting a Class object directly with the #writer_class method, or by the 'writer_class_name' Setting, with a String name of class meeting the Writer contract. There are several that ship with traject itself:

  • traject/json_writer (Traject::JsonWriter) -- write newline-delimied json files.
  • traject/yaml_writer (Traject::YamlWriter) -- write pretty yaml file; very human-readable
  • traject/debug_writer (Traject::DebugWriter) -- write a tab-delimited file where each line consists of the id, field, and value(s).
  • traject/delimited_writer and traject/csv_writer -- write character-delimited files (default is tab-delimited) or comma-separated-value files.

Creating and Using an Indexer programmatically

Normally the Traject::Indexer is created and used by a Traject::Command object. However, you can also create and use a Traject::Indexer programmatically, for embeddeding in your own ruby software. (Note, you will get best performance under Jruby only)

 indexer = Traject::Indexer.new

You can load a config file from disk, using standard ruby instance_eval. One benefit of loading one or more ordinary traject config files saved separately on disk is that these config files could also be used with the standard traject command line.

 indexer.load_config_file(path_to_config)

This may raise if the file is not readable. Or if the config file can't be evaluated, it will raise a Traject::Indexer::ConfigLoadError with a bunch of contextual information useful to reporting to developer.

You can also instead, or in addition, write configuration inline using standard ruby instance_eval:

indexer.instance_eval do
   to_field "something", literal("something")
   # etc
end

Or even load configuration from an existing lambda/proc object:

config = proc do
  to_field "something", literal("something")
end
indexer.instance_eval &config

It is least confusing to provide settings after you load config files, so you can determine if your settings should be defaults (taking effect only if not provided in earlier config), or should force themselves, potentially overwriting earlier config:

 indexer.settings do
    # default, won't overwrite if already set by earlier config
    provide "solr.url", "http://example.org/solr"
    provide "reader", "Traject::MarcReader"

    # or force over any previous config
    store "solr.url", "http://example.org/solr"
 end

Once your indexer is set up, you could use it to transform individual input records to output hashes. This method will ignore any readers and writers, and won't use thread pools, it just maps. Under standard MARC setup, record should be a MARC::Record:

 output_hash = indexer.map_record(record)

Or you could process an entire stream of input records from the configured reader, to the configured writer, as the traject command line does:

 indexer.process(io_stream)
 # or, eg:
 File.open("path/to/input") do |file|
   indexer.process(file)
 end

At present, you can only call #process once on an indexer, but let us know if that's a problem, we could enhance.

Please do let us know if there is some part of this API that is inconveient for you, we'd like to know your use case and improve things.

Defined Under Namespace

Classes: AfterProcessingStep, ArityError, ConfigLoadError, Context, EachRecordStep, NamingError, Settings, ToFieldStep

Constant Summary

Constants included from Macros::Marc21

Macros::Marc21::EXTRACT_ALL_MARC_VALID_OPTIONS, Macros::Marc21::EXTRACT_MARC_VALID_OPTIONS, Macros::Marc21::SERIALZED_MARC_VALID_OPTIONS

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Macros::Basic

#literal

Methods included from Macros::Marc21

apply_extraction_options, #extract_all_marc_values, extract_marc, extract_marc_from, first!, #serialized_marc, trim_punctuation

Methods included from QualifiedConstGet

#qualified_const_get

Constructor Details

#initialize(arg_settings = {}) ⇒ Indexer

optional hash or Traject::Indexer::Settings object of settings.



176
177
178
179
180
# File 'lib/traject/indexer.rb', line 176

def initialize(arg_settings = {})
  @settings = Settings.new(arg_settings)
  @index_steps = []
  @after_processing_steps = []
end

Instance Attribute Details

#loggerObject



255
256
257
# File 'lib/traject/indexer.rb', line 255

def logger
  @logger ||= create_logger
end

#reader_classObject



482
483
484
485
486
487
# File 'lib/traject/indexer.rb', line 482

def reader_class
  unless defined? @reader_class
    @reader_class = qualified_const_get(settings["reader_class_name"])
  end
  return @reader_class
end

#writerObject



505
506
507
# File 'lib/traject/indexer.rb', line 505

def writer
  @writer ||= settings["writer"] || writer!
end

#writer_classObject



489
490
491
# File 'lib/traject/indexer.rb', line 489

def writer_class
  writer.class
end

Instance Method Details

#after_processing(aLambda = nil, &block) ⇒ Object

Part of DSL, register logic to be called once at the end of processing a stream of records.



251
252
253
# File 'lib/traject/indexer.rb', line 251

def after_processing(aLambda = nil, &block)
  @after_processing_steps << AfterProcessingStep.new(aLambda, block, Traject::Util.extract_caller_location(caller.first))
end

#create_loggerObject

Create logger according to settings



271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
# File 'lib/traject/indexer.rb', line 271

def create_logger

  logger_level  = settings["log.level"] || "info"

  # log everything to STDERR or specified logfile
  logger = Yell::Logger.new(:null)
  logger.format = logger_format
  logger.level  = logger_level

  logger_destination = settings["log.file"] || "STDERR"
  # We intentionally repeat the logger_level
  # on the adapter, so it will stay there if overall level
  # is changed.
  case logger_destination
  when "STDERR"
    logger.adapter :stderr, level: logger_level, format: logger_format
  when "STDOUT"
    logger.adapter :stdout, level: logger_level, format: logger_format
  else
    logger.adapter :file, logger_destination, level: logger_level, format: logger_format
  end


  # ADDITIONALLY log error and higher to....
  if settings["log.error_file"]
    logger.adapter :file, settings["log.error_file"], :level => 'gte.error'
  end

  return logger
end

#each_record(aLambda = nil, &block) ⇒ Object

Part of DSL, register logic to be called for each record



245
246
247
# File 'lib/traject/indexer.rb', line 245

def each_record(aLambda = nil, &block)
  @index_steps << EachRecordStep.new(aLambda, block, Traject::Util.extract_caller_location(caller.first) )
end

#load_config_file(file_path) ⇒ Object

Pass a string file path, or a File object, for a config file to load into indexer.

Can raise:

  • Errno::ENOENT or Errno::EACCES if file path is not accessible
  • Traject::Indexer::ConfigLoadError if exception is raised evaluating the config. A ConfigLoadError has information in it about original exception, and exactly what config file and line number triggered it.


190
191
192
193
194
195
196
197
198
# File 'lib/traject/indexer.rb', line 190

def load_config_file(file_path)
  File.open(file_path) do |file|
    begin
      self.instance_eval(file.read, file_path)
    rescue ScriptError, StandardError => e
      raise ConfigLoadError.new(file_path, e)
    end
  end
end

#log_mapping_errors(context, index_step) ⇒ Object

just a wrapper that captures and records any unexpected errors raised in mapping, along with contextual information on record and location in source file of mapping rule.

Re-raises error at the moment.

log_mapping_errors(context, index_step) do all_sorts_of_stuff # that will have errors logged end



358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
# File 'lib/traject/indexer.rb', line 358

def log_mapping_errors(context, index_step)
  begin
    yield
  rescue Exception => e
    msg =  "Unexpected error on record id `#{context.source_record_id}` at file position #{context.position}\n"
    msg += "    while executing #{index_step.inspect}\n"
    msg += Traject::Util.exception_to_log_message(e)

    logger.error msg
    begin
      logger.debug "Record: " + context.source_record.to_s
    rescue Exception => marc_to_s_exception
      logger.debug "(Could not log record, #{marc_to_s_exception})"
    end

    raise e
  end
end

#log_skip(context) ⇒ Object

Log that the current record is being skipped, using data in context.position and context.skipmessage



478
479
480
# File 'lib/traject/indexer.rb', line 478

def log_skip(context)
  logger.debug "Skipped record #{context.position}: #{context.skipmessage}"
end

#logger_formatObject



261
262
263
264
265
266
267
268
# File 'lib/traject/indexer.rb', line 261

def logger_format
  format = settings["log.format"] || "%d %5L %m"
  format = case format
    when "false" then false
    when "" then nil
    else format
  end
end

#map_record(record) ⇒ Object

Processes a single record according to indexing rules set up in this indexer. Returns the output hash (a hash whose keys are string fields, and values are arrays of one or more values in that field)

This is a convenience shortcut for #map_to_context! -- use that one if you want to provide addtional context like position, and/or get back the full context.



309
310
311
312
313
# File 'lib/traject/indexer.rb', line 309

def map_record(record)
  context = Context.new(:source_record => record, :settings => settings)
  map_to_context!(context)
  return context.output_hash
end

#map_to_context!(context) ⇒ Object

Maps a single record INTO the second argument, a Traject::Indexer::Context.

Context must be passed with a #source_record and #settings, and optionally a #position.

Context will be mutated by this method, most significantly by adding an #output_hash, a hash from fieldname to array of values in that field.

Pass in a context with a set #position if you want that to be available to mapping routines.

Returns the context passed in as second arg, as a convenience for chaining etc.



328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
# File 'lib/traject/indexer.rb', line 328

def map_to_context!(context)
  @index_steps.each do |index_step|
    # Don't bother if we're skipping this record
    break if context.skip?

    context.index_step = index_step
    accumulator = log_mapping_errors(context, index_step) do
      index_step.execute(context) # will always return [] for an each_record step
    end

    if accumulator.size > 0
      accumulator.compact!
      (context.output_hash[index_step.field_name] ||= []).concat accumulator
    end

    context.index_step = nil
  end

  return context
end

#process(io_stream) ⇒ Object

Processes a stream of records, reading from the configured Reader, mapping according to configured mapping rules, and then writing to configured Writer.

returns 'false' as a signal to command line to return non-zero exit code for some reason (reason found in logs, presumably). This particular mechanism is open to complexification, starting simple. We do need SOME way to return non-zero to command line.



387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
# File 'lib/traject/indexer.rb', line 387

def process(io_stream)
  settings.fill_in_defaults!

  count      =       0
  start_time = batch_start_time = Time.now
  logger.debug "beginning Indexer#process with settings: #{settings.inspect}"

  reader = self.reader!(io_stream)

  processing_threads = settings["processing_thread_pool"].to_i
  thread_pool = Traject::ThreadPool.new(processing_threads)

  logger.info "   Indexer with #{processing_threads} processing threads, reader: #{reader.class.name} and writer: #{writer.class.name}"

  log_batch_size = settings["log.batch_size"] && settings["log.batch_size"].to_i

  reader.each do |record; position|
    count += 1

    # have to use a block local var, so the changing `count` one
    # doesn't get caught in the closure. Weird, yeah.
    position = count

    thread_pool.raise_collected_exception!

    if settings["debug_ascii_progress"].to_s == "true"
      $stderr.write "." if count % settings["solr_writer.batch_size"].to_i == 0
    end

    context = Context.new(
      :source_record => record,
      :settings => settings,
      :position => position,
      :logger => logger
    )

    if log_batch_size && (count % log_batch_size == 0)
      batch_rps = log_batch_size / (Time.now - batch_start_time)
      overall_rps = count / (Time.now - start_time)
      logger.send(settings["log.batch_size.severity"].downcase.to_sym, "Traject::Indexer#process, read #{count} records at id:#{context.source_record_id}; #{'%.0f' % batch_rps}/s this batch, #{'%.0f' % overall_rps}/s overall")
      batch_start_time = Time.now
    end

    # We pass context in a block arg to properly 'capture' it, so
    # we don't accidentally share the local var under closure between
    # threads.
    thread_pool.maybe_in_thread_pool(context) do |context|
      map_to_context!(context)
      if context.skip?
        log_skip(context)
      else
        writer.put context
      end

    end

  end
  $stderr.write "\n" if settings["debug_ascii_progress"].to_s == "true"

  logger.debug "Shutting down #processing mapper threadpool..."
  thread_pool.shutdown_and_wait
  logger.debug "#processing mapper threadpool shutdown complete."

  thread_pool.raise_collected_exception!


  writer.close if writer.respond_to?(:close)

  @after_processing_steps.each do |step|
    begin
      step.execute
    rescue Exception => e
      logger.fatal("Unexpected exception #{e} when executing #{step}")
      raise e
    end
  end

  elapsed        = Time.now - start_time
  avg_rps        = (count / elapsed)
  logger.info "finished Indexer#process: #{count} records in #{'%.3f' % elapsed} seconds; #{'%.1f' % avg_rps} records/second overall."

  if writer.respond_to?(:skipped_record_count) && writer.skipped_record_count > 0
    logger.error "Indexer#process returning 'false' due to #{writer.skipped_record_count} skipped records."
    return false
  end

  return true
end

#reader!(io_stream) ⇒ Object

Instantiate a Traject Reader, using class set in #reader_class, initialized with io_stream passed in



495
496
497
# File 'lib/traject/indexer.rb', line 495

def reader!(io_stream)
  return reader_class.new(io_stream, settings.merge("logger" => logger))
end

#settings(new_settings = nil, &block) ⇒ Object

Part of the config file DSL, for writing settings values.

The Indexer's settings consist of a hash-like Traject::Settings object. The settings hash is not nested hashes, just one level of configuration settings. Keys are always strings, and by convention use "." for namespacing, eg log.file

The settings method with no arguments returns that Settings object.

With a hash and/or block argument, can be used to set new key/values. Each call merges onto the existing settings hash. The block is instance_evald in the context of the Traject::Settings object.

indexer.settings("a" => "a", "b" => "b")

indexer.settings do provide "b", "new b" end

indexer.settings #=> => "a", "b" => "new b"

Note the #provide method is defined on Traject::Settings to write to a setting only if previously not set. You can also use #store to force over-writing even if an existing setting.

Even with arguments, Indexer#settings returns the Settings object, hash too, so can method calls can be chained.



229
230
231
232
233
234
235
# File 'lib/traject/indexer.rb', line 229

def settings(new_settings = nil, &block)
  @settings.merge!(new_settings) if new_settings

  @settings.instance_eval &block if block

  return @settings
end

#to_field(field_name, aLambda = nil, &block) ⇒ Object

Part of DSL, used to define an indexing mapping. Register logic to be called for each record, and generate values for a particular output field.



240
241
242
# File 'lib/traject/indexer.rb', line 240

def to_field(field_name, aLambda = nil, &block)
  @index_steps << ToFieldStep.new(field_name, aLambda, block, Traject::Util.extract_caller_location(caller.first) )
end

#writer!Object

Instantiate a Traject Writer, suing class set in #writer_class



500
501
502
503
# File 'lib/traject/indexer.rb', line 500

def writer!
  writer_class = @writer_class || qualified_const_get(settings["writer_class_name"])
  writer_class.new(settings.merge("logger" => logger))
end