Class: Traject::SolrJsonWriter

Inherits:
Object
  • Object
show all
Includes:
QualifiedConstGet
Defined in:
lib/traject/solr_json_writer.rb

Constant Summary collapse

DEFAULT_MAX_SKIPPED =
0
DEFAULT_BATCH_SIZE =
100

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from QualifiedConstGet

#qualified_const_get

Constructor Details

#initialize(argSettings) ⇒ SolrJsonWriter

Returns a new instance of SolrJsonWriter.



60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
# File 'lib/traject/solr_json_writer.rb', line 60

def initialize(argSettings)
  @settings = Traject::Indexer::Settings.new(argSettings)

  # Set max errors
  @max_skipped = (@settings['solr_writer.max_skipped'] || DEFAULT_MAX_SKIPPED).to_i
  if @max_skipped < 0
    @max_skipped = nil
  end

  @http_client = @settings["solr_json_writer.http_client"] || HTTPClient.new

  @batch_size = (settings["solr_writer.batch_size"] || DEFAULT_BATCH_SIZE).to_i
  @batch_size = 1 if @batch_size < 1

  # Store error count in an AtomicInteger, so multi threads can increment
  # it safely, if we're threaded.
  @skipped_record_incrementer = Concurrent::AtomicFixnum.new(0)


  # How many threads to use for the writer?
  # if our thread pool settings are 0, it'll just create a null threadpool that
  # executes in calling context.
  @thread_pool_size = (@settings["solr_writer.thread_pool"] || 1).to_i

  @batched_queue         = Queue.new
  @thread_pool = Traject::ThreadPool.new(@thread_pool_size)

  # old setting solrj_writer supported for backwards compat, as we make
  # this the new default writer. 
  @commit_on_close = (settings["solr_writer.commit_on_close"] || settings["solrj_writer.commit_on_close"]).to_s == "true"

  # Figure out where to send updates
  @solr_update_url = self.determine_solr_update_url

  logger.info("   #{self.class.name} writing to '#{@solr_update_url}' in batches of #{@batch_size} with #{@thread_pool_size} bg threads")
end

Instance Attribute Details

#batched_queueObject (readonly)

A queue to hold documents before sending to solr



58
59
60
# File 'lib/traject/solr_json_writer.rb', line 58

def batched_queue
  @batched_queue
end

#settingsObject (readonly)

The passed-in settings



55
56
57
# File 'lib/traject/solr_json_writer.rb', line 55

def settings
  @settings
end

#thread_pool_sizeObject (readonly)

The passed-in settings



55
56
57
# File 'lib/traject/solr_json_writer.rb', line 55

def thread_pool_size
  @thread_pool_size
end

Instance Method Details

#check_solr_update_url(url) ⇒ Object

If we've got a solr.update_url, make sure it's ok



239
240
241
242
243
244
# File 'lib/traject/solr_json_writer.rb', line 239

def check_solr_update_url(url)
  unless url =~ /^#{URI::regexp}$/
    raise ArgumentError.new("#{self.class.name} setting `solr.update_url` doesn't look like a URL: `#{url}`")
  end
  url
end

#closeObject

On close, we need to (a) raise any exceptions we might have, (b) send off the last (possibly empty) batch, and (c) commit if instructed to do so via the solr_writer.commit_on_close setting.



172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
# File 'lib/traject/solr_json_writer.rb', line 172

def close
  @thread_pool.raise_collected_exception!

  # Finish off whatever's left. Do it in the thread pool for
  # consistency, and to ensure expected order of operations, so
  # it goes to the end of the queue behind any other work.
  batch = Traject::Util.drain_queue(@batched_queue)
  if batch.length > 0
    @thread_pool.maybe_in_thread_pool { send_batch(batch) }
  end

  # Wait for shutdown, and time it.
  logger.debug "#{self.class.name}: Shutting down thread pool, waiting if needed..."
  elapsed = @thread_pool.shutdown_and_wait
  if elapsed > 60
    logger.warn "Waited #{elapsed} seconds for all threads, you may want to increase solr_writer.thread_pool (currently #{@settings["solr_writer.thread_pool"]})"
  end
  logger.debug "#{self.class.name}: Thread pool shutdown complete"
  logger.warn "#{self.class.name}: #{skipped_record_count} skipped records" if skipped_record_count > 0

  # check again now that we've waited, there could still be some
  # that didn't show up before.
  @thread_pool.raise_collected_exception!

  # Commit if we're supposed to
  if @commit_on_close
    commit
  end
end

#commitObject

Send a commit



204
205
206
207
208
209
210
211
212
213
214
215
216
217
# File 'lib/traject/solr_json_writer.rb', line 204

def commit
  logger.info "#{self.class.name} sending commit to solr at url #{@solr_update_url}..."

  original_timeout = @http_client.receive_timeout

  @http_client.receive_timeout = (settings["commit_timeout"] || (10 * 60)).to_i

  resp = @http_client.get(@solr_update_url, {"commit" => 'true'})
  unless resp.status == 200
    raise RuntimeError.new("Could not commit to Solr: #{resp.status} #{resp.body}")
  end

  @http_client.receive_timeout = original_timeout
end

#derive_solr_update_url_from_solr_url(url) ⇒ Object



246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
# File 'lib/traject/solr_json_writer.rb', line 246

def derive_solr_update_url_from_solr_url(url)
  # Nil? Then we bail
  if url.nil?
    raise ArgumentError.new("#{self.class.name}: Neither solr.update_url nor solr.url set; need at least one")
  end

  # Not a URL? Bail
  unless url =~ /^#{URI::regexp}$/
    raise ArgumentError.new("#{self.class.name} setting `solr.url` doesn't look like a URL: `#{url}`")
  end

  # First, try the /update/json handler
  candidate = [url.chomp('/'), 'update', 'json'].join('/')
  resp      = @http_client.get(candidate)
  if resp.status == 404
    candidate = [url.chomp('/'), 'update'].join('/')
  end
  candidate
end

#determine_solr_update_urlObject

Relatively complex logic to determine if we have a valid URL and what it is



229
230
231
232
233
234
235
# File 'lib/traject/solr_json_writer.rb', line 229

def determine_solr_update_url
  if settings['solr.update_url']
    check_solr_update_url(settings['solr.update_url'])
  else
    derive_solr_update_url_from_solr_url(settings['solr.url'])
  end
end

#loggerObject

Get the logger from the settings, or default to an effectively null logger



165
166
167
# File 'lib/traject/solr_json_writer.rb', line 165

def logger
  settings["logger"] ||= Yell.new(STDERR, :level => "gt.fatal") # null logger
end

#put(context) ⇒ Object

Add a single context to the queue, ready to be sent to solr



99
100
101
102
103
104
105
106
107
# File 'lib/traject/solr_json_writer.rb', line 99

def put(context)
  @thread_pool.raise_collected_exception!

  @batched_queue << context
  if @batched_queue.size >= @batch_size
    batch = Traject::Util.drain_queue(@batched_queue)
    @thread_pool.maybe_in_thread_pool(batch) {|batch_arg| send_batch(batch_arg) }
  end
end

#send_batch(batch) ⇒ Object

Send the given batch of contexts. If something goes wrong, send them one at a time.

Parameters:



112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
# File 'lib/traject/solr_json_writer.rb', line 112

def send_batch(batch)
  return if batch.empty?
  json_package = JSON.generate(batch.map { |c| c.output_hash })
  begin
    resp = @http_client.post @solr_update_url, json_package, "Content-type" => "application/json"
  rescue StandardError => exception
  end

  if exception || resp.status != 200
    error_message = exception ? 
      Traject::Util.exception_to_log_message(exception) : 
      "Solr response: #{resp.status}: #{resp.body}"

    logger.error "Error in Solr batch add. Will retry documents individually at performance penalty: #{error_message}"
    
    batch.each do |c|
      send_single(c)
    end
  end
end

#send_single(c) ⇒ Object

Send a single context to Solr, logging an error if need be

Parameters:



136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
# File 'lib/traject/solr_json_writer.rb', line 136

def send_single(c)
  json_package = JSON.generate([c.output_hash])
  begin
    resp = @http_client.post @solr_update_url, json_package, "Content-type" => "application/json"
    # Catch Timeouts and network errors as skipped records, but otherwise
    # allow unexpected errors to propagate up. 
  rescue HTTPClient::TimeoutError, SocketError, Errno::ECONNREFUSED => exception
  end

  if exception || resp.status != 200
    if exception
      msg = Traject::Util.exception_to_log_message(exception)
    else
      msg = "Solr error response: #{resp.status}: #{resp.body}"
    end
    logger.error "Could not add record #{c.source_record_id} at source file position #{c.position}: #{msg}"
    logger.debug(c.source_record.to_s)

    @skipped_record_incrementer.increment
    if @max_skipped and skipped_record_count > @max_skipped
      raise RuntimeError.new("#{self.class.name}: Exceeded maximum number of skipped records (#{@max_skipped}): aborting")
    end

  end

end

#skipped_record_countObject

Return count of encountered skipped records. Most accurate to call it after #close, in which case it should include full count, even under async thread_pool.



223
224
225
# File 'lib/traject/solr_json_writer.rb', line 223

def skipped_record_count
  @skipped_record_incrementer.value
end