Class: Traject::Indexer

Inherits:
Object
  • Object
show all
Includes:
Macros::Basic, Macros::Marc21, QualifiedConstGet
Defined in:
lib/traject/indexer.rb,
lib/traject/indexer/settings.rb

Overview

This class does indexing for traject: Getting input records from a Reader class, mapping the input records to an output hash, and then sending the output hash off somewhere (usually Solr) with a Writer class.

Traject config files are instance_evald in an Indexer object, so self in a config file is an Indexer, and any Indexer methods can be called.

However, certain Indexer methods exist almost entirely for the purpose of being called in config files; these methods are part of the expected Domain-Specific Language ("DSL") for config files, and will ordinarily form the bulk or entirety of config files:

  • #settings
  • #to_field
  • #each_record
  • #after_procesing
  • #logger (rarely used in config files, but in some cases to set up custom logging config)

If accessing a Traject::Indexer programmatically (instead of via command line with config files), additional methods of note include:

# to process a stream of input records from configured Reader,
# to configured Writer:
indexer.process(io_stream)

# To map a single input record manually to an ouput_hash,
# ignoring Readers and Writers
hash = indexer.map_record(record)

## Readers and Writers

The Indexer has a modularized architecture for readers and writers, for where source records come from (reader), and where output is sent to (writer).

A Reader is any class that: 1) Has a two-argument initializer taking an IO stream and a Settings hash 2) Responds to the usual ruby #each, returning a source record from each #each. (Including Enumerable is prob a good idea too)

The default reader is the Traject::MarcReader, who's behavior is further customized by several settings in the Settings hash. Jruby users with specialized needs may want to look at the gem traject-marc4j_reader.

Alternate readers can be set directly with the #reader_class= method, or with the "reader_class_name" Setting, a String name of a class meeting the reader contract.

A Writer is any class that: 1) Has a one-argument initializer taking a Settings hash. (The logger is provided to the Writer in settings["logger"]) 2) Responds to a one argument #put method, where the argument is a Traject::Indexer::Context, containing an #output_hash hash of mapped keys/values. The writer should write them to the appropriate place. 3) Responds to a #close method, called when we're done. 4) Optionally implements a #skipped_record_count method, returning int count of records that were skipped due to errors (and presumably logged)

Traject packages one solr writer: traject/solr_json_writer, which sends in json format and works under both ruby and jruby, but only with solr version

= 3.2. To index to an older solr installation, you'll need to use jruby and install the gem traject-solrj_writer, which uses the solrj .jar underneath.

You can set alternate writers by setting a Class object directly with the #writer_class method, or by the 'writer_class_name' Setting, with a String name of class meeting the Writer contract. There are several that ship with traject itself:

  • traject/json_writer (Traject::JsonWriter) -- write newline-delimied json files.
  • traject/yaml_writer (Traject::YamlWriter) -- write pretty yaml file; very human-readable
  • traject/debug_writer (Traject::DebugWriter) -- write a tab-delimited file where each line consists of the id, field, and value(s).
  • traject/delimited_writer and traject/csv_writer -- write character-delimited files (default is tab-delimited) or comma-separated-value files.

Defined Under Namespace

Classes: AfterProcessingStep, ArityError, Context, EachRecordStep, NamingError, Settings, ToFieldStep

Constant Summary

Constants included from Macros::Marc21

Macros::Marc21::EXTRACT_ALL_MARC_VALID_OPTIONS, Macros::Marc21::EXTRACT_MARC_VALID_OPTIONS, Macros::Marc21::SERIALZED_MARC_VALID_OPTIONS

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Macros::Basic

#literal

Methods included from Macros::Marc21

apply_extraction_options, #extract_all_marc_values, #extract_marc, first!, #serialized_marc, trim_punctuation

Methods included from QualifiedConstGet

#qualified_const_get

Constructor Details

#initialize(arg_settings = {}) ⇒ Indexer

optional hash or Traject::Indexer::Settings object of settings.



117
118
119
120
121
# File 'lib/traject/indexer.rb', line 117

def initialize(arg_settings = {})
  @settings = Settings.new(arg_settings)
  @index_steps = []
  @after_processing_steps = []
end

Instance Attribute Details

#loggerObject



178
179
180
# File 'lib/traject/indexer.rb', line 178

def logger
  @logger ||= create_logger
end

#reader_classObject



408
409
410
411
412
413
# File 'lib/traject/indexer.rb', line 408

def reader_class
  unless defined? @reader_class
    @reader_class = qualified_const_get(settings["reader_class_name"])
  end
  return @reader_class
end

#writer_classObject



415
416
417
418
419
420
# File 'lib/traject/indexer.rb', line 415

def writer_class
  unless defined? @writer_class
    @writer_class = qualified_const_get(settings["writer_class_name"])
  end
  return @writer_class
end

Instance Method Details

#after_processing(aLambda = nil, &block) ⇒ Object

Part of DSL, register logic to be called once at the end of processing a stream of records.



174
175
176
# File 'lib/traject/indexer.rb', line 174

def after_processing(aLambda = nil, &block)
  @after_processing_steps << AfterProcessingStep.new(aLambda, block, Traject::Util.extract_caller_location(caller.first))
end

#create_loggerObject

Create logger according to settings



194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
# File 'lib/traject/indexer.rb', line 194

def create_logger

  logger_level  = settings["log.level"] || "info"

  # log everything to STDERR or specified logfile
  logger = Yell::Logger.new(:null)
  logger.format = logger_format
  logger.level  = logger_level

  logger_destination = settings["log.file"] || "STDERR"
  # We intentionally repeat the logger_level
  # on the adapter, so it will stay there if overall level
  # is changed.
  case logger_destination
  when "STDERR"
    logger.adapter :stderr, level: logger_level, format: logger_format
  when "STDOUT"
    logger.adapter :stdout, level: logger_level, format: logger_format
  else
    logger.adapter :file, logger_destination, level: logger_level, format: logger_format
  end


  # ADDITIONALLY log error and higher to....
  if settings["log.error_file"]
    logger.adapter :file, settings["log.error_file"], :level => 'gte.error'
  end

  return logger
end

#each_record(aLambda = nil, &block) ⇒ Object

Part of DSL, register logic to be called for each record



168
169
170
# File 'lib/traject/indexer.rb', line 168

def each_record(aLambda = nil, &block)
  @index_steps << EachRecordStep.new(aLambda, block, Traject::Util.extract_caller_location(caller.first) )
end

#id_string(record) ⇒ Object

get a printable id from record for error logging. Maybe override this for a future XML version.



302
303
304
# File 'lib/traject/indexer.rb', line 302

def id_string(record)
  record && record['001'] && record['001'].value.to_s
end

#log_mapping_errors(context, index_step) ⇒ Object

just a wrapper that captures and records any unexpected errors raised in mapping, along with contextual information on record and location in source file of mapping rule.

Re-raises error at the moment.

log_mapping_errors(context, index_step) do all_sorts_of_stuff # that will have errors logged end



281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
# File 'lib/traject/indexer.rb', line 281

def log_mapping_errors(context, index_step)
  begin
    yield
  rescue Exception => e
    msg =  "Unexpected error on record id `#{id_string(context.source_record)}` at file position #{context.position}\n"
    msg += "    while executing #{index_step.inspect}\n"
    msg += Traject::Util.exception_to_log_message(e)

    logger.error msg
    begin
      logger.debug "Record: " + context.source_record.to_s
    rescue Exception => marc_to_s_exception
      logger.debug "(Could not log record, #{marc_to_s_exception})"
    end

    raise e
  end
end

#log_skip(context) ⇒ Object

Log that the current record is being skipped, using data in context.position and context.skipmessage



404
405
406
# File 'lib/traject/indexer.rb', line 404

def log_skip(context)
  logger.debug "Skipped record #{context.position}: #{context.skipmessage}"
end

#logger_formatObject



184
185
186
187
188
189
190
191
# File 'lib/traject/indexer.rb', line 184

def logger_format
  format = settings["log.format"] || "%d %5L %m"
  format = case format
    when "false" then false
    when "" then nil
    else format
  end
end

#map_record(record) ⇒ Object

Processes a single record according to indexing rules set up in this indexer. Returns the output hash (a hash whose keys are string fields, and values are arrays of one or more values in that field)

This is a convenience shortcut for #map_to_context! -- use that one if you want to provide addtional context like position, and/or get back the full context.



232
233
234
235
236
# File 'lib/traject/indexer.rb', line 232

def map_record(record)
  context = Context.new(:source_record => record, :settings => settings)
  map_to_context!(context)
  return context.output_hash
end

#map_to_context!(context) ⇒ Object

Maps a single record INTO the second argument, a Traject::Indexer::Context.

Context must be passed with a #source_record and #settings, and optionally a #position.

Context will be mutated by this method, most significantly by adding an #output_hash, a hash from fieldname to array of values in that field.

Pass in a context with a set #position if you want that to be available to mapping routines.

Returns the context passed in as second arg, as a convenience for chaining etc.



251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
# File 'lib/traject/indexer.rb', line 251

def map_to_context!(context)
  @index_steps.each do |index_step|
    # Don't bother if we're skipping this record
    break if context.skip?

    context.index_step = index_step
    accumulator = log_mapping_errors(context, index_step) do
      index_step.execute(context) # will always return [] for an each_record step
    end

    if accumulator.size > 0
      accumulator.compact!
      (context.output_hash[index_step.field_name] ||= []).concat accumulator
    end

    context.index_step = nil
  end

  return context
end

#process(io_stream) ⇒ Object

Processes a stream of records, reading from the configured Reader, mapping according to configured mapping rules, and then writing to configured Writer.

returns 'false' as a signal to command line to return non-zero exit code for some reason (reason found in logs, presumably). This particular mechanism is open to complexification, starting simple. We do need SOME way to return non-zero to command line.



315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
# File 'lib/traject/indexer.rb', line 315

def process(io_stream)
  settings.fill_in_defaults!

  count      =       0
  start_time = batch_start_time = Time.now
  logger.debug "beginning Indexer#process with settings: #{settings.inspect}"

  reader = self.reader!(io_stream)
  writer = self.writer!


  processing_threads = settings["processing_thread_pool"].to_i
  thread_pool = Traject::ThreadPool.new(processing_threads)

  logger.info "   Indexer with #{processing_threads} processing threads, reader: #{reader.class.name} and writer: #{writer.class.name}"

  log_batch_size = settings["log.batch_size"] && settings["log.batch_size"].to_i

  reader.each do |record; position|
    count += 1

    # have to use a block local var, so the changing `count` one
    # doesn't get caught in the closure. Weird, yeah.
    position = count

    thread_pool.raise_collected_exception!

    if settings["debug_ascii_progress"].to_s == "true"
      $stderr.write "." if count % settings["solr_writer.batch_size"].to_i == 0
    end

    if log_batch_size && (count % log_batch_size == 0)
      batch_rps = log_batch_size / (Time.now - batch_start_time)
      overall_rps = count / (Time.now - start_time)
      logger.send(settings["log.batch_size.severity"].downcase.to_sym, "Traject::Indexer#process, read #{count} records at id:#{id_string(record)}; #{'%.0f' % batch_rps}/s this batch, #{'%.0f' % overall_rps}/s overall")
      batch_start_time = Time.now
    end

    # we have to use this weird lambda to properly "capture" the count, instead
    # of having it be bound to the original variable in a non-threadsafe way.
    # This is confusing, I might not be understanding things properly, but that's where i am.
    #thread_pool.maybe_in_thread_pool &make_lambda(count, record, writer)
    thread_pool.maybe_in_thread_pool(record, settings, position) do |record, settings, position|
      context = Context.new(:source_record => record, :settings => settings, :position => position)
      context.logger = logger
      map_to_context!(context)
      if context.skip?
        log_skip(context)
      else
        writer.put context
      end

    end

  end
  $stderr.write "\n" if settings["debug_ascii_progress"].to_s == "true"

  logger.debug "Shutting down #processing mapper threadpool..."
  thread_pool.shutdown_and_wait
  logger.debug "#processing mapper threadpool shutdown complete."

  thread_pool.raise_collected_exception!


  writer.close if writer.respond_to?(:close)

  @after_processing_steps.each do |step|
    begin
      step.execute
    rescue Exception => e
      logger.fatal("Unexpected exception #{e} when executing #{step}")
      raise e
    end
  end

  elapsed        = Time.now - start_time
  avg_rps        = (count / elapsed)
  logger.info "finished Indexer#process: #{count} records in #{'%.3f' % elapsed} seconds; #{'%.1f' % avg_rps} records/second overall."

  if writer.respond_to?(:skipped_record_count) && writer.skipped_record_count > 0
    logger.error "Indexer#process returning 'false' due to #{writer.skipped_record_count} skipped records."
    return false
  end

  return true
end

#reader!(io_stream) ⇒ Object

Instantiate a Traject Reader, using class set in #reader_class, initialized with io_stream passed in



424
425
426
# File 'lib/traject/indexer.rb', line 424

def reader!(io_stream)
  return reader_class.new(io_stream, settings.merge("logger" => logger))
end

#settings(new_settings = nil, &block) ⇒ Object

Part of the config file DSL, for writing settings values.

The Indexer's settings consist of a hash-like Traject::Settings object. The settings hash is not nested hashes, just one level of configuration settings. Keys are always strings, and by convention use "." for namespacing, eg log.file

The settings method with no arguments returns that Settings object.

With a hash and/or block argument, can be used to set new key/values. Each call merges onto the existing settings hash. The block is instance_evald in the context of the Traject::Settings object.

indexer.settings("a" => "a", "b" => "b")

indexer.settings do provide "b", "new b" end

indexer.settings #=> => "a", "b" => "new b"

Note the #provide method is defined on Traject::Settings to write to a setting only if previously not set. You can also use #store to force over-writing even if an existing setting.

Even with arguments, Indexer#settings returns the Settings object, hash too, so can method calls can be chained.



152
153
154
155
156
157
158
# File 'lib/traject/indexer.rb', line 152

def settings(new_settings = nil, &block)
  @settings.merge!(new_settings) if new_settings

  @settings.instance_eval &block if block

  return @settings
end

#to_field(field_name, aLambda = nil, &block) ⇒ Object

Part of DSL, used to define an indexing mapping. Register logic to be called for each record, and generate values for a particular output field.



163
164
165
# File 'lib/traject/indexer.rb', line 163

def to_field(field_name, aLambda = nil, &block)
  @index_steps << ToFieldStep.new(field_name, aLambda, block, Traject::Util.extract_caller_location(caller.first) )
end

#writer!Object

Instantiate a Traject Writer, suing class set in #writer_class



429
430
431
# File 'lib/traject/indexer.rb', line 429

def writer!
  return writer_class.new(settings.merge("logger" => logger))
end