Class: Traject::SolrJsonWriter

Inherits:
Object
  • Object
show all
Includes:
QualifiedConstGet
Defined in:
lib/traject/solr_json_writer.rb

Defined Under Namespace

Classes: MaxSkippedRecordsExceeded

Constant Summary collapse

URI_REGEXP =
URI::Parser.new.make_regexp.freeze
DEFAULT_MAX_SKIPPED =
0
DEFAULT_BATCH_SIZE =
100

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from QualifiedConstGet

#qualified_const_get

Constructor Details

#initialize(argSettings) ⇒ SolrJsonWriter

Returns a new instance of SolrJsonWriter.



65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
# File 'lib/traject/solr_json_writer.rb', line 65

def initialize(argSettings)
  @settings = Traject::Indexer::Settings.new(argSettings)

  # Set max errors
  @max_skipped = (@settings['solr_writer.max_skipped'] || DEFAULT_MAX_SKIPPED).to_i
  if @max_skipped < 0
    @max_skipped = nil
  end

  @http_client = @settings["solr_json_writer.http_client"] || HTTPClient.new

  @batch_size = (settings["solr_writer.batch_size"] || DEFAULT_BATCH_SIZE).to_i
  @batch_size = 1 if @batch_size < 1

  # Store error count in an AtomicInteger, so multi threads can increment
  # it safely, if we're threaded.
  @skipped_record_incrementer = Concurrent::AtomicFixnum.new(0)


  # How many threads to use for the writer?
  # if our thread pool settings are 0, it'll just create a null threadpool that
  # executes in calling context.
  @thread_pool_size = (@settings["solr_writer.thread_pool"] || 1).to_i

  @batched_queue         = Queue.new
  @thread_pool = Traject::ThreadPool.new(@thread_pool_size)

  # old setting solrj_writer supported for backwards compat, as we make
  # this the new default writer.
  @commit_on_close = (settings["solr_writer.commit_on_close"] || settings["solrj_writer.commit_on_close"]).to_s == "true"

  # Figure out where to send updates
  @solr_update_url = self.determine_solr_update_url

  logger.info("   #{self.class.name} writing to '#{@solr_update_url}' in batches of #{@batch_size} with #{@thread_pool_size} bg threads")
end

Instance Attribute Details

#batched_queueObject (readonly)

A queue to hold documents before sending to solr



63
64
65
# File 'lib/traject/solr_json_writer.rb', line 63

def batched_queue
  @batched_queue
end

#settingsObject (readonly)

The passed-in settings



60
61
62
# File 'lib/traject/solr_json_writer.rb', line 60

def settings
  @settings
end

#thread_pool_sizeObject (readonly)

The passed-in settings



60
61
62
# File 'lib/traject/solr_json_writer.rb', line 60

def thread_pool_size
  @thread_pool_size
end

Instance Method Details

#check_solr_update_url(url) ⇒ Object

If we've got a solr.update_url, make sure it's ok



258
259
260
261
262
263
# File 'lib/traject/solr_json_writer.rb', line 258

def check_solr_update_url(url)
  unless /^#{URI_REGEXP}$/.match(url)
    raise ArgumentError.new("#{self.class.name} setting `solr.update_url` doesn't look like a URL: `#{url}`")
  end
  url
end

#closeObject

On close, we need to (a) raise any exceptions we might have, (b) send off the last (possibly empty) batch, and (c) commit if instructed to do so via the solr_writer.commit_on_close setting.



191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
# File 'lib/traject/solr_json_writer.rb', line 191

def close
  @thread_pool.raise_collected_exception!

  # Finish off whatever's left. Do it in the thread pool for
  # consistency, and to ensure expected order of operations, so
  # it goes to the end of the queue behind any other work.
  batch = Traject::Util.drain_queue(@batched_queue)
  if batch.length > 0
    @thread_pool.maybe_in_thread_pool { send_batch(batch) }
  end

  # Wait for shutdown, and time it.
  logger.debug "#{self.class.name}: Shutting down thread pool, waiting if needed..."
  elapsed = @thread_pool.shutdown_and_wait
  if elapsed > 60
    logger.warn "Waited #{elapsed} seconds for all threads, you may want to increase solr_writer.thread_pool (currently #{@settings["solr_writer.thread_pool"]})"
  end
  logger.debug "#{self.class.name}: Thread pool shutdown complete"
  logger.warn "#{self.class.name}: #{skipped_record_count} skipped records" if skipped_record_count > 0

  # check again now that we've waited, there could still be some
  # that didn't show up before.
  @thread_pool.raise_collected_exception!

  # Commit if we're supposed to
  if @commit_on_close
    commit
  end
end

#commitObject

Send a commit



223
224
225
226
227
228
229
230
231
232
233
234
235
236
# File 'lib/traject/solr_json_writer.rb', line 223

def commit
  logger.info "#{self.class.name} sending commit to solr at url #{@solr_update_url}..."

  original_timeout = @http_client.receive_timeout

  @http_client.receive_timeout = (settings["commit_timeout"] || (10 * 60)).to_i

  resp = @http_client.get(@solr_update_url, {"commit" => 'true'})
  unless resp.status == 200
    raise RuntimeError.new("Could not commit to Solr: #{resp.status} #{resp.body}")
  end

  @http_client.receive_timeout = original_timeout
end

#derive_solr_update_url_from_solr_url(url) ⇒ Object



265
266
267
268
269
270
271
272
273
274
275
276
277
278
# File 'lib/traject/solr_json_writer.rb', line 265

def derive_solr_update_url_from_solr_url(url)
  # Nil? Then we bail
  if url.nil?
    raise ArgumentError.new("#{self.class.name}: Neither solr.update_url nor solr.url set; need at least one")
  end

  # Not a URL? Bail
  unless  /^#{URI_REGEXP}$/.match(url)
    raise ArgumentError.new("#{self.class.name} setting `solr.url` doesn't look like a URL: `#{url}`")
  end

  # Assume the /update/json handler
  return [url.chomp('/'), 'update', 'json'].join('/')
end

#determine_solr_update_urlObject

Relatively complex logic to determine if we have a valid URL and what it is



248
249
250
251
252
253
254
# File 'lib/traject/solr_json_writer.rb', line 248

def determine_solr_update_url
  if settings['solr.update_url']
    check_solr_update_url(settings['solr.update_url'])
  else
    derive_solr_update_url_from_solr_url(settings['solr.url'])
  end
end

#flushObject

Not part of standard writer API.

If we are batching adds, and have some not-yet-written ones queued up -- flush em all to solr.

This should be thread-safe to call, but the write does take place in the caller's thread, no threading is done for you here, regardless of setting of solr_writer.thread_pool



122
123
124
# File 'lib/traject/solr_json_writer.rb', line 122

def flush
  send_batch( Traject::Util.drain_queue(@batched_queue) )
end

#loggerObject

Get the logger from the settings, or default to an effectively null logger



184
185
186
# File 'lib/traject/solr_json_writer.rb', line 184

def logger
  settings["logger"] ||= Yell.new(STDERR, :level => "gt.fatal") # null logger
end

#put(context) ⇒ Object

Add a single context to the queue, ready to be sent to solr



104
105
106
107
108
109
110
111
112
# File 'lib/traject/solr_json_writer.rb', line 104

def put(context)
  @thread_pool.raise_collected_exception!

  @batched_queue << context
  if @batched_queue.size >= @batch_size
    batch = Traject::Util.drain_queue(@batched_queue)
    @thread_pool.maybe_in_thread_pool(batch) {|batch_arg| send_batch(batch_arg) }
  end
end

#send_batch(batch) ⇒ Object

Send the given batch of contexts. If something goes wrong, send them one at a time.

Parameters:



129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
# File 'lib/traject/solr_json_writer.rb', line 129

def send_batch(batch)
  return if batch.empty?
  json_package = JSON.generate(batch.map { |c| c.output_hash })
  begin
    resp = @http_client.post @solr_update_url, json_package, "Content-type" => "application/json"
  rescue StandardError => exception
  end

  if exception || resp.status != 200
    error_message = exception ?
      Traject::Util.exception_to_log_message(exception) :
      "Solr response: #{resp.status}: #{resp.body}"

    logger.error "Error in Solr batch add. Will retry documents individually at performance penalty: #{error_message}"

    batch.each do |c|
      send_single(c)
    end
  end
end

#send_single(c) ⇒ Object

Send a single context to Solr, logging an error if need be

Parameters:



153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
# File 'lib/traject/solr_json_writer.rb', line 153

def send_single(c)
  json_package = JSON.generate([c.output_hash])
  begin
    resp = @http_client.post @solr_update_url, json_package, "Content-type" => "application/json"
    # Catch Timeouts and network errors as skipped records, but otherwise
    # allow unexpected errors to propagate up.
  rescue *skippable_exceptions => exception
    # no body, local variable exception set above will be used below
  end

  if exception || resp.status != 200
    if exception
      msg = Traject::Util.exception_to_log_message(exception)
    else
      msg = "Solr error response: #{resp.status}: #{resp.body}"
    end
    logger.error "Could not add record #{c.record_inspect}: #{msg}"
    logger.debug("\t" + exception.backtrace.join("\n\t")) if exception
    logger.debug(c.source_record.to_s) if c.source_record

    @skipped_record_incrementer.increment
    if @max_skipped and skipped_record_count > @max_skipped
      raise MaxSkippedRecordsExceeded.new("#{self.class.name}: Exceeded maximum number of skipped records (#{@max_skipped}): aborting")
    end

  end

end

#skipped_record_countObject

Return count of encountered skipped records. Most accurate to call it after #close, in which case it should include full count, even under async thread_pool.



242
243
244
# File 'lib/traject/solr_json_writer.rb', line 242

def skipped_record_count
  @skipped_record_incrementer.value
end