Class: Traject::Indexer

Inherits:
Object
  • Object
show all
Includes:
Macros::Basic, Macros::Marc21, QualifiedConstGet
Defined in:
lib/traject/indexer.rb,
lib/traject/indexer/settings.rb

Overview

A Hash of settings for a Traject::Indexer, which also ends up passed along to other objects Traject::Indexer interacts with.

Enhanced with a few features from Hashie, to make it for instance string/symbol indifferent

#provide(key, value) is added, to do like settings ||= value, set only if not already set (but unlike ||=, nil or false can count as already set)

Also has an interesting ‘defaults’ system, meant to play along with configuration file ‘provide’ statements. There is a built-in hash of defaults, which will be lazily filled in if accessed and not yet set. (nil can count as set, though!). If they haven’t been lazily set yet, then #provide will still fill them in. But you can also call fill_in_defaults! to fill all defaults in, if you know configuration files have all been loaded, and want to fill them in for inspection.

Defined Under Namespace

Classes: ArityError, Context, LastNamedStep, NamingError, Settings

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Macros::Basic

#literal

Methods included from Macros::Marc21

#extract_all_marc_values, #extract_marc, first!, #serialized_marc, trim_punctuation

Methods included from QualifiedConstGet

#qualified_const_get

Constructor Details

#initialize(arg_settings = {}) ⇒ Indexer

optional hash or Traject::Indexer::Settings object of settings.



73
74
75
76
# File 'lib/traject/indexer.rb', line 73

def initialize(arg_settings = {})
  @settings = Settings.new(arg_settings)
  @index_steps = []
end

Instance Attribute Details

#loggerObject



106
107
108
# File 'lib/traject/indexer.rb', line 106

def logger
  @logger ||= create_logger
end

#reader_classObject



370
371
372
373
374
375
# File 'lib/traject/indexer.rb', line 370

def reader_class
  unless defined? @reader_class
    @reader_class = qualified_const_get(settings["reader_class_name"])
  end
  return @reader_class
end

#writer_classObject



377
378
379
380
381
382
# File 'lib/traject/indexer.rb', line 377

def writer_class
  unless defined? @writer_class
    @writer_class = qualified_const_get(settings["writer_class_name"])
  end
  return @writer_class
end

Instance Method Details

#create_loggerObject

Create logger according to settings



141
142
143
144
145
146
147
148
149
150
# File 'lib/traject/indexer.rb', line 141

def create_logger
  # log everything to STDERR or specified logfile
  logger = Yell.new( logger_argument, logger_options )
  # ADDITIONALLY log error and higher to....
  if settings["log.error_file"]
    logger.adapter :file, settings["log.error_file"], :level => 'gte.error'
  end

  return logger
end

#each_record(aLambda = nil, &block) ⇒ Object



170
171
172
173
174
175
176
177
178
# File 'lib/traject/indexer.rb', line 170

def each_record(aLambda = nil, &block)
  verify_each_record_arguments(aLambda, block)
  @index_steps << {
    :lambda => aLambda,
    :block  => block,
    :type   => :each_record,
    :source_location => Traject::Util.extract_caller_location(caller.first)
  }
end

#id_string(record) ⇒ Object

get a printable id from record for error logging. Maybe override this for a future XML version.



397
398
399
# File 'lib/traject/indexer.rb', line 397

def id_string(record)
  record && record['001'] && record['001'].value.to_s
end

#last_named_stepObject



452
453
454
# File 'lib/traject/indexer.rb', line 452

def last_named_step
  return LastNamedStep.new(@index_steps)
end

#log_mapping_errors(context, index_step, aProc) ⇒ Object

just a wrapper that captures and records any unexpected errors raised in mapping, along with contextual information on record and location in source file of mapping rule.

Re-raises error at the moment.

log_errors(context, some_lambda) do

all_sorts_of_stuff # that will have errors logged

end



264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
# File 'lib/traject/indexer.rb', line 264

def log_mapping_errors(context, index_step, aProc)
  begin
    yield
  rescue Exception => e        
    msg =  "Unexpected error on record id `#{id_string(context.source_record)}` at file position #{context.position}\n"

    conf = context.field_name ? "to_field '#{context.field_name}'" : "each_record"

    msg += "    while executing #{conf} defined at #{index_step[:source_location]}\n"
    msg += Traject::Util.exception_to_log_message(e)

    logger.error msg      
    logger.debug "Record: " + context.source_record.to_s

    raise e        
  end
end

#log_skip(context) ⇒ Object

Log that the current record is being skipped, using data in context.position and context.skipmessage



366
367
368
# File 'lib/traject/indexer.rb', line 366

def log_skip(context)
  logger.debug "Skipped record #{context.position}: #{context.skipmessage}"
end

#logger_argumentObject

Just calculates the arg that’s gonna be given to Yell.new or SomeLogger.new



114
115
116
117
118
119
120
121
122
# File 'lib/traject/indexer.rb', line 114

def logger_argument
  specified = settings["log.file"] || "STDERR"

  case specified
  when "STDOUT" then STDOUT
  when "STDERR" then STDERR
  else specified
  end
end

#logger_optionsObject

Second arg to Yell.new, options hash, calculated from settings



126
127
128
129
130
131
132
133
134
135
136
137
138
# File 'lib/traject/indexer.rb', line 126

def logger_options
  # formatter, default is fairly basic
  format = settings["log.format"] || "%d %5L %m"
  format = case format
  when "false" then false
  when "" then nil
  else format
  end

  level = settings["log.level"] || "info"

  {:format => format, :level => level}
end

#map_record(record) ⇒ Object

Processes a single record according to indexing rules set up in this indexer. Returns the output hash (a hash whose keys are string fields, and values are arrays of one or more values in that field)

This is a convenience shortcut for #map_to_context! – use that one if you want to provide addtional context like position, and/or get back the full context.



188
189
190
191
192
# File 'lib/traject/indexer.rb', line 188

def map_record(record)
  context = Context.new(:source_record => record, :settings => settings)
  map_to_context!(context)
  return context.output_hash
end

#map_to_context!(context) ⇒ Object

Maps a single record INTO the second argument, a Traject::Indexer::Context.

Context must be passed with a #source_record and #settings, and optionally a #position.

Context will be mutated by this method, most significantly by adding an #output_hash, a hash from fieldname to array of values in that field.

Pass in a context with a set #position if you want that to be available to mapping routines.

Returns the context passed in as second arg, as a convenience for chaining etc.



206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
# File 'lib/traject/indexer.rb', line 206

def map_to_context!(context)
  @index_steps.each do |index_step|
    # Don't bother if we're skipping this record
    break if context.skip?
    if index_step[:type] == :to_field

      accumulator = []
      context.field_name = index_step[:field_name]

      # Might have a lambda arg AND a block, we execute in order,
      # with same accumulator.

      [index_step[:lambda], index_step[:block]].each do |aProc|
        if aProc
          log_mapping_errors(context, index_step, aProc) do
            if aProc.arity == 2
              aProc.call(context.source_record, accumulator)
            else
              aProc.call(context.source_record, accumulator, context)
            end
          end
        end
      end
      (context.output_hash[context.field_name] ||= []).concat accumulator unless accumulator.empty?
      context.field_name = nil

    elsif index_step[:type] == :each_record

      # one or two arg
      [index_step[:lambda], index_step[:block]].each do |aProc|
        if aProc
          log_mapping_errors(context, index_step, aProc) do
            if aProc.arity == 1
              aProc.call(context.source_record)
            else
              aProc.call(context.source_record, context)
            end
          end
        end
      end

    else
      raise ArgumentError.new("An @index_step we don't know how to deal with: #{@index_step}")
    end
  end

  return context
end

#process(io_stream) ⇒ Object

Processes a stream of records, reading from the configured Reader, mapping according to configured mapping rules, and then writing to configured Writer.

returns ‘false’ as a signal to command line to return non-zero exit code for some reason (reason found in logs, presumably). This particular mechanism is open to complexification, starting simple. We do need SOME way to return non-zero to command line.



291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
# File 'lib/traject/indexer.rb', line 291

def process(io_stream)
  settings.fill_in_defaults!

  count      =       0
  start_time = batch_start_time = Time.now
  logger.info "beginning Indexer#process with settings: #{settings.inspect}"

  reader = self.reader!(io_stream)
  writer = self.writer!

  thread_pool = Traject::ThreadPool.new(settings["processing_thread_pool"].to_i)

  logger.info "   with reader: #{reader.class.name} and writer: #{writer.class.name}"

  reader.each do |record; position|
    count += 1

    # have to use a block local var, so the changing `count` one
    # doesn't get caught in the closure. Weird, yeah.
    position = count

    thread_pool.raise_collected_exception!

    if settings["debug_ascii_progress"].to_s == "true"
      $stderr.write "." if count % settings["solrj_writer.batch_size"] == 0
    end

    if settings["log.batch_progress"] && (count % settings["log.batch_progress"].to_i == 0)
      batch_rps = settings["log.batch_progress"].to_i / (Time.now - batch_start_time)
      overall_rps = count / (Time.now - start_time)
      logger.info "Traject::Indexer#process, read #{count} records at id:#{id_string(record)}; #{'%.0f' % batch_rps}/s this batch, #{'%.0f' % overall_rps}/s overall"
      batch_start_time = Time.now
    end

    # we have to use this weird lambda to properly "capture" the count, instead
    # of having it be bound to the original variable in a non-threadsafe way.
    # This is confusing, I might not be understanding things properly, but that's where i am.
    #thread_pool.maybe_in_thread_pool &make_lambda(count, record, writer)
    thread_pool.maybe_in_thread_pool do
      context = Context.new(:source_record => record, :settings => settings, :position => position)
      map_to_context!(context)
      if context.skip?
        log_skip(context)
      else
        writer.put context
      end
        
    end

  end
  $stderr.write "\n" if settings["debug_ascii_progress"].to_s == "true"

  logger.debug "Shutting down #processing mapper threadpool..."
  thread_pool.shutdown_and_wait
  logger.debug "#processing mapper threadpool shutdown complete."

  thread_pool.raise_collected_exception!

  
  writer.close if writer.respond_to?(:close)

  elapsed        = Time.now - start_time
  avg_rps        = (count / elapsed)
  logger.info "finished Indexer#process: #{count} records in #{'%.3f' % elapsed} seconds; #{'%.1f' % avg_rps} records/second overall."

  if writer.respond_to?(:skipped_record_count) && writer.skipped_record_count > 0
    logger.error "Indexer#process returning 'false' due to #{writer.skipped_record_count} skipped records."
    return false
  end

  return true
end

#reader!(io_stream) ⇒ Object

Instantiate a Traject Reader, using class set in #reader_class, initialized with io_stream passed in



386
387
388
# File 'lib/traject/indexer.rb', line 386

def reader!(io_stream)
  return reader_class.new(io_stream, settings.merge("logger" => logger))
end

#settings(new_settings = nil, &block) ⇒ Object

The Indexer’s settings are a hash of key/values – not nested, just one level – of configuration settings. Keys are strings.

The settings method with no arguments returns that hash.

With a hash and/or block argument, can be used to set new key/values. Each call merges onto the existing settings hash.

indexer.settings("a" => "a", "b" => "b")

indexer.settings do
  store "b", "new b"
end

indexer.settings #=> {"a" => "a", "b" => "new b"}

even with arguments, returns settings hash too, so can be chained.



98
99
100
101
102
103
104
# File 'lib/traject/indexer.rb', line 98

def settings(new_settings = nil, &block)
  @settings.merge!(new_settings) if new_settings

  @settings.instance_eval &block if block

  return @settings
end

#to_field(field_name, aLambda = nil, &block) ⇒ Object

Used to define an indexing mapping.



157
158
159
160
161
162
163
164
165
166
167
168
# File 'lib/traject/indexer.rb', line 157

def to_field(field_name, aLambda = nil, &block)

  verify_to_field_arguments(field_name, aLambda, block)

  @index_steps << {
    :field_name => field_name.to_s,
    :lambda => aLambda,
    :block  => block,
    :type   => :to_field,
    :source_location => Traject::Util.extract_caller_location(caller.first)
  }
end

#verify_each_record_arguments(aLambda, block) ⇒ Object

Verify the procs sent to each_record to make sure it’s all kosher.



433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
# File 'lib/traject/indexer.rb', line 433

def verify_each_record_arguments(aLambda, block)
  unless aLambda or block
    raise ArgumentError.new("Missing Argument: each_record must take a block/lambda as an argument (#{last_named_step.message})")
  end
  
  [aLambda, block].each do |proc|
    # allow negative arity, meaning variable/optional, trust em on that.
    # but for positive arrity, we need 1 or 2 args
    if proc
      unless proc.is_a?(Proc)
        raise NamingError.new("argument to each_record must be a block/lambda, not a #{proc.class} (#{last_named_step.message})")
      end
      if (proc.arity == 0 || proc.arity > 2)
        raise ArityError.new("block/proc given to each_record needs 1 or 2 arguments: #{proc} (#{last_named_step.message})")
      end
    end
  end
end

#verify_field_name(field_name) ⇒ Object

Verify that the field name is good, and throw a useful error if not



405
406
407
408
409
# File 'lib/traject/indexer.rb', line 405

def verify_field_name(field_name)
  if field_name.nil? || !field_name.is_a?(String) || field_name.empty? 
    raise NamingError.new("to_field requires the field name (String) as the first argument (#{last_named_step.message})")
  end
end

#verify_to_field_arguments(field_name, aLambda, block) ⇒ Object

Verify the various, increasingly-complex things that can be sent to to_field to make sure it’s all kosher.

“Modification” takes place for zero-argument blocks that return a lambda



417
418
419
420
421
422
423
424
425
426
427
428
429
# File 'lib/traject/indexer.rb', line 417

def verify_to_field_arguments(field_name, aLambda, block)

  verify_field_name(field_name)
  
  [aLambda, block].each do |proc|
    # allow negative arity, meaning variable/optional, trust em on that.
    # but for positive arrity, we need 2 or 3 args
    if proc && (proc.arity == 0 || proc.arity == 1 || proc.arity > 3)
      raise ArityError.new("error parsing field '#{field_name}': block/proc given to to_field needs 2 or 3 (or variable) arguments: #{proc} (#{last_named_step.message})")
    end
  end
  
end

#writer!Object

Instantiate a Traject Writer, suing class set in #writer_class



391
392
393
# File 'lib/traject/indexer.rb', line 391

def writer!
  return writer_class.new(settings.merge("logger" => logger))
end