Class: Traject::SolrJsonWriter
- Inherits:
-
Object
- Object
- Traject::SolrJsonWriter
- Includes:
- QualifiedConstGet
- Defined in:
- lib/traject/solr_json_writer.rb
Defined Under Namespace
Classes: MaxSkippedRecordsExceeded
Constant Summary collapse
- URI_REGEXP =
URI::Parser.new.make_regexp.freeze
- DEFAULT_MAX_SKIPPED =
0
- DEFAULT_BATCH_SIZE =
100
Instance Attribute Summary collapse
-
#batched_queue ⇒ Object
readonly
A queue to hold documents before sending to solr.
-
#settings ⇒ Object
readonly
The passed-in settings.
-
#thread_pool_size ⇒ Object
readonly
The passed-in settings.
Instance Method Summary collapse
-
#check_solr_update_url(url) ⇒ Object
If we've got a solr.update_url, make sure it's ok.
-
#close ⇒ Object
On close, we need to (a) raise any exceptions we might have, (b) send off the last (possibly empty) batch, and (c) commit if instructed to do so via the solr_writer.commit_on_close setting.
-
#commit ⇒ Object
Send a commit.
- #derive_solr_update_url_from_solr_url(url) ⇒ Object
-
#determine_solr_update_url ⇒ Object
Relatively complex logic to determine if we have a valid URL and what it is.
-
#flush ⇒ Object
Not part of standard writer API.
-
#initialize(argSettings) ⇒ SolrJsonWriter
constructor
A new instance of SolrJsonWriter.
-
#logger ⇒ Object
Get the logger from the settings, or default to an effectively null logger.
-
#put(context) ⇒ Object
Add a single context to the queue, ready to be sent to solr.
-
#send_batch(batch) ⇒ Object
Send the given batch of contexts.
-
#send_single(c) ⇒ Object
Send a single context to Solr, logging an error if need be.
-
#skipped_record_count ⇒ Object
Return count of encountered skipped records.
Methods included from QualifiedConstGet
Constructor Details
#initialize(argSettings) ⇒ SolrJsonWriter
Returns a new instance of SolrJsonWriter.
65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 |
# File 'lib/traject/solr_json_writer.rb', line 65 def initialize(argSettings) @settings = Traject::Indexer::Settings.new(argSettings) # Set max errors @max_skipped = (@settings['solr_writer.max_skipped'] || DEFAULT_MAX_SKIPPED).to_i if @max_skipped < 0 @max_skipped = nil end @http_client = @settings["solr_json_writer.http_client"] || HTTPClient.new @batch_size = (settings["solr_writer.batch_size"] || DEFAULT_BATCH_SIZE).to_i @batch_size = 1 if @batch_size < 1 # Store error count in an AtomicInteger, so multi threads can increment # it safely, if we're threaded. @skipped_record_incrementer = Concurrent::AtomicFixnum.new(0) # How many threads to use for the writer? # if our thread pool settings are 0, it'll just create a null threadpool that # executes in calling context. @thread_pool_size = (@settings["solr_writer.thread_pool"] || 1).to_i @batched_queue = Queue.new @thread_pool = Traject::ThreadPool.new(@thread_pool_size) # old setting solrj_writer supported for backwards compat, as we make # this the new default writer. @commit_on_close = (settings["solr_writer.commit_on_close"] || settings["solrj_writer.commit_on_close"]).to_s == "true" # Figure out where to send updates @solr_update_url = self.determine_solr_update_url logger.info(" #{self.class.name} writing to '#{@solr_update_url}' in batches of #{@batch_size} with #{@thread_pool_size} bg threads") end |
Instance Attribute Details
#batched_queue ⇒ Object (readonly)
A queue to hold documents before sending to solr
63 64 65 |
# File 'lib/traject/solr_json_writer.rb', line 63 def batched_queue @batched_queue end |
#settings ⇒ Object (readonly)
The passed-in settings
60 61 62 |
# File 'lib/traject/solr_json_writer.rb', line 60 def settings @settings end |
#thread_pool_size ⇒ Object (readonly)
The passed-in settings
60 61 62 |
# File 'lib/traject/solr_json_writer.rb', line 60 def thread_pool_size @thread_pool_size end |
Instance Method Details
#check_solr_update_url(url) ⇒ Object
If we've got a solr.update_url, make sure it's ok
258 259 260 261 262 263 |
# File 'lib/traject/solr_json_writer.rb', line 258 def check_solr_update_url(url) unless /^#{URI_REGEXP}$/.match(url) raise ArgumentError.new("#{self.class.name} setting `solr.update_url` doesn't look like a URL: `#{url}`") end url end |
#close ⇒ Object
On close, we need to (a) raise any exceptions we might have, (b) send off the last (possibly empty) batch, and (c) commit if instructed to do so via the solr_writer.commit_on_close setting.
191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 |
# File 'lib/traject/solr_json_writer.rb', line 191 def close @thread_pool.raise_collected_exception! # Finish off whatever's left. Do it in the thread pool for # consistency, and to ensure expected order of operations, so # it goes to the end of the queue behind any other work. batch = Traject::Util.drain_queue(@batched_queue) if batch.length > 0 @thread_pool.maybe_in_thread_pool { send_batch(batch) } end # Wait for shutdown, and time it. logger.debug "#{self.class.name}: Shutting down thread pool, waiting if needed..." elapsed = @thread_pool.shutdown_and_wait if elapsed > 60 logger.warn "Waited #{elapsed} seconds for all threads, you may want to increase solr_writer.thread_pool (currently #{@settings["solr_writer.thread_pool"]})" end logger.debug "#{self.class.name}: Thread pool shutdown complete" logger.warn "#{self.class.name}: #{skipped_record_count} skipped records" if skipped_record_count > 0 # check again now that we've waited, there could still be some # that didn't show up before. @thread_pool.raise_collected_exception! # Commit if we're supposed to if @commit_on_close commit end end |
#commit ⇒ Object
Send a commit
223 224 225 226 227 228 229 230 231 232 233 234 235 236 |
# File 'lib/traject/solr_json_writer.rb', line 223 def commit logger.info "#{self.class.name} sending commit to solr at url #{@solr_update_url}..." original_timeout = @http_client.receive_timeout @http_client.receive_timeout = (settings["commit_timeout"] || (10 * 60)).to_i resp = @http_client.get(@solr_update_url, {"commit" => 'true'}) unless resp.status == 200 raise RuntimeError.new("Could not commit to Solr: #{resp.status} #{resp.body}") end @http_client.receive_timeout = original_timeout end |
#derive_solr_update_url_from_solr_url(url) ⇒ Object
265 266 267 268 269 270 271 272 273 274 275 276 277 278 |
# File 'lib/traject/solr_json_writer.rb', line 265 def derive_solr_update_url_from_solr_url(url) # Nil? Then we bail if url.nil? raise ArgumentError.new("#{self.class.name}: Neither solr.update_url nor solr.url set; need at least one") end # Not a URL? Bail unless /^#{URI_REGEXP}$/.match(url) raise ArgumentError.new("#{self.class.name} setting `solr.url` doesn't look like a URL: `#{url}`") end # Assume the /update/json handler return [url.chomp('/'), 'update', 'json'].join('/') end |
#determine_solr_update_url ⇒ Object
Relatively complex logic to determine if we have a valid URL and what it is
248 249 250 251 252 253 254 |
# File 'lib/traject/solr_json_writer.rb', line 248 def determine_solr_update_url if settings['solr.update_url'] check_solr_update_url(settings['solr.update_url']) else derive_solr_update_url_from_solr_url(settings['solr.url']) end end |
#flush ⇒ Object
Not part of standard writer API.
If we are batching adds, and have some not-yet-written ones queued up -- flush em all to solr.
This should be thread-safe to call, but the write does take place in the caller's thread, no threading is done for you here, regardless of setting of solr_writer.thread_pool
122 123 124 |
# File 'lib/traject/solr_json_writer.rb', line 122 def flush send_batch( Traject::Util.drain_queue(@batched_queue) ) end |
#logger ⇒ Object
Get the logger from the settings, or default to an effectively null logger
184 185 186 |
# File 'lib/traject/solr_json_writer.rb', line 184 def logger settings["logger"] ||= Yell.new(STDERR, :level => "gt.fatal") # null logger end |
#put(context) ⇒ Object
Add a single context to the queue, ready to be sent to solr
104 105 106 107 108 109 110 111 112 |
# File 'lib/traject/solr_json_writer.rb', line 104 def put(context) @thread_pool.raise_collected_exception! @batched_queue << context if @batched_queue.size >= @batch_size batch = Traject::Util.drain_queue(@batched_queue) @thread_pool.maybe_in_thread_pool(batch) {|batch_arg| send_batch(batch_arg) } end end |
#send_batch(batch) ⇒ Object
Send the given batch of contexts. If something goes wrong, send them one at a time.
129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 |
# File 'lib/traject/solr_json_writer.rb', line 129 def send_batch(batch) return if batch.empty? json_package = JSON.generate(batch.map { |c| c.output_hash }) begin resp = @http_client.post @solr_update_url, json_package, "Content-type" => "application/json" rescue StandardError => exception end if exception || resp.status != 200 = exception ? Traject::Util.(exception) : "Solr response: #{resp.status}: #{resp.body}" logger.error "Error in Solr batch add. Will retry documents individually at performance penalty: #{}" batch.each do |c| send_single(c) end end end |
#send_single(c) ⇒ Object
Send a single context to Solr, logging an error if need be
153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 |
# File 'lib/traject/solr_json_writer.rb', line 153 def send_single(c) json_package = JSON.generate([c.output_hash]) begin resp = @http_client.post @solr_update_url, json_package, "Content-type" => "application/json" # Catch Timeouts and network errors as skipped records, but otherwise # allow unexpected errors to propagate up. rescue *skippable_exceptions => exception # no body, local variable exception set above will be used below end if exception || resp.status != 200 if exception msg = Traject::Util.(exception) else msg = "Solr error response: #{resp.status}: #{resp.body}" end logger.error "Could not add record #{c.record_inspect}: #{msg}" logger.debug("\t" + exception.backtrace.join("\n\t")) if exception logger.debug(c.source_record.to_s) if c.source_record @skipped_record_incrementer.increment if @max_skipped and skipped_record_count > @max_skipped raise MaxSkippedRecordsExceeded.new("#{self.class.name}: Exceeded maximum number of skipped records (#{@max_skipped}): aborting") end end end |
#skipped_record_count ⇒ Object
Return count of encountered skipped records. Most accurate to call it after #close, in which case it should include full count, even under async thread_pool.
242 243 244 |
# File 'lib/traject/solr_json_writer.rb', line 242 def skipped_record_count @skipped_record_incrementer.value end |