Class: Traject::SolrJsonWriter
- Inherits:
-
Object
- Object
- Traject::SolrJsonWriter
- Includes:
- QualifiedConstGet
- Defined in:
- lib/traject/solr_json_writer.rb
Constant Summary collapse
- DEFAULT_MAX_SKIPPED =
0
- DEFAULT_BATCH_SIZE =
100
Instance Attribute Summary collapse
-
#batched_queue ⇒ Object
readonly
A queue to hold documents before sending to solr.
-
#settings ⇒ Object
readonly
The passed-in settings.
-
#thread_pool_size ⇒ Object
readonly
The passed-in settings.
Instance Method Summary collapse
-
#check_solr_update_url(url) ⇒ Object
If we've got a solr.update_url, make sure it's ok.
-
#close ⇒ Object
On close, we need to (a) raise any exceptions we might have, (b) send off the last (possibly empty) batch, and (c) commit if instructed to do so via the solr_writer.commit_on_close setting.
-
#commit ⇒ Object
Send a commit.
- #derive_solr_update_url_from_solr_url(url) ⇒ Object
-
#determine_solr_update_url ⇒ Object
Relatively complex logic to determine if we have a valid URL and what it is.
-
#initialize(argSettings) ⇒ SolrJsonWriter
constructor
A new instance of SolrJsonWriter.
-
#logger ⇒ Object
Get the logger from the settings, or default to an effectively null logger.
-
#put(context) ⇒ Object
Add a single context to the queue, ready to be sent to solr.
-
#send_batch(batch) ⇒ Object
Send the given batch of contexts.
-
#send_single(c) ⇒ Object
Send a single context to Solr, logging an error if need be.
-
#skipped_record_count ⇒ Object
Return count of encountered skipped records.
Methods included from QualifiedConstGet
Constructor Details
#initialize(argSettings) ⇒ SolrJsonWriter
Returns a new instance of SolrJsonWriter.
60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 |
# File 'lib/traject/solr_json_writer.rb', line 60 def initialize(argSettings) @settings = Traject::Indexer::Settings.new(argSettings) # Set max errors @max_skipped = (@settings['solr_writer.max_skipped'] || DEFAULT_MAX_SKIPPED).to_i if @max_skipped < 0 @max_skipped = nil end @http_client = @settings["solr_json_writer.http_client"] || HTTPClient.new @batch_size = (settings["solr_writer.batch_size"] || DEFAULT_BATCH_SIZE).to_i @batch_size = 1 if @batch_size < 1 # Store error count in an AtomicInteger, so multi threads can increment # it safely, if we're threaded. @skipped_record_incrementer = Concurrent::AtomicFixnum.new(0) # How many threads to use for the writer? # if our thread pool settings are 0, it'll just create a null threadpool that # executes in calling context. @thread_pool_size = (@settings["solr_writer.thread_pool"] || 1).to_i @batched_queue = Queue.new @thread_pool = Traject::ThreadPool.new(@thread_pool_size) # old setting solrj_writer supported for backwards compat, as we make # this the new default writer. @commit_on_close = (settings["solr_writer.commit_on_close"] || settings["solrj_writer.commit_on_close"]).to_s == "true" # Figure out where to send updates @solr_update_url = self.determine_solr_update_url logger.info(" #{self.class.name} writing to '#{@solr_update_url}' in batches of #{@batch_size} with #{@thread_pool_size} bg threads") end |
Instance Attribute Details
#batched_queue ⇒ Object (readonly)
A queue to hold documents before sending to solr
58 59 60 |
# File 'lib/traject/solr_json_writer.rb', line 58 def batched_queue @batched_queue end |
#settings ⇒ Object (readonly)
The passed-in settings
55 56 57 |
# File 'lib/traject/solr_json_writer.rb', line 55 def settings @settings end |
#thread_pool_size ⇒ Object (readonly)
The passed-in settings
55 56 57 |
# File 'lib/traject/solr_json_writer.rb', line 55 def thread_pool_size @thread_pool_size end |
Instance Method Details
#check_solr_update_url(url) ⇒ Object
If we've got a solr.update_url, make sure it's ok
239 240 241 242 243 244 |
# File 'lib/traject/solr_json_writer.rb', line 239 def check_solr_update_url(url) unless url =~ /^#{URI::regexp}$/ raise ArgumentError.new("#{self.class.name} setting `solr.update_url` doesn't look like a URL: `#{url}`") end url end |
#close ⇒ Object
On close, we need to (a) raise any exceptions we might have, (b) send off the last (possibly empty) batch, and (c) commit if instructed to do so via the solr_writer.commit_on_close setting.
172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 |
# File 'lib/traject/solr_json_writer.rb', line 172 def close @thread_pool.raise_collected_exception! # Finish off whatever's left. Do it in the thread pool for # consistency, and to ensure expected order of operations, so # it goes to the end of the queue behind any other work. batch = Traject::Util.drain_queue(@batched_queue) if batch.length > 0 @thread_pool.maybe_in_thread_pool { send_batch(batch) } end # Wait for shutdown, and time it. logger.debug "#{self.class.name}: Shutting down thread pool, waiting if needed..." elapsed = @thread_pool.shutdown_and_wait if elapsed > 60 logger.warn "Waited #{elapsed} seconds for all threads, you may want to increase solr_writer.thread_pool (currently #{@settings["solr_writer.thread_pool"]})" end logger.debug "#{self.class.name}: Thread pool shutdown complete" logger.warn "#{self.class.name}: #{skipped_record_count} skipped records" if skipped_record_count > 0 # check again now that we've waited, there could still be some # that didn't show up before. @thread_pool.raise_collected_exception! # Commit if we're supposed to if @commit_on_close commit end end |
#commit ⇒ Object
Send a commit
204 205 206 207 208 209 210 211 212 213 214 215 216 217 |
# File 'lib/traject/solr_json_writer.rb', line 204 def commit logger.info "#{self.class.name} sending commit to solr at url #{@solr_update_url}..." original_timeout = @http_client.receive_timeout @http_client.receive_timeout = (settings["commit_timeout"] || (10 * 60)).to_i resp = @http_client.get(@solr_update_url, {"commit" => 'true'}) unless resp.status == 200 raise RuntimeError.new("Could not commit to Solr: #{resp.status} #{resp.body}") end @http_client.receive_timeout = original_timeout end |
#derive_solr_update_url_from_solr_url(url) ⇒ Object
246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 |
# File 'lib/traject/solr_json_writer.rb', line 246 def derive_solr_update_url_from_solr_url(url) # Nil? Then we bail if url.nil? raise ArgumentError.new("#{self.class.name}: Neither solr.update_url nor solr.url set; need at least one") end # Not a URL? Bail unless url =~ /^#{URI::regexp}$/ raise ArgumentError.new("#{self.class.name} setting `solr.url` doesn't look like a URL: `#{url}`") end # First, try the /update/json handler candidate = [url.chomp('/'), 'update', 'json'].join('/') resp = @http_client.get(candidate) if resp.status == 404 candidate = [url.chomp('/'), 'update'].join('/') end candidate end |
#determine_solr_update_url ⇒ Object
Relatively complex logic to determine if we have a valid URL and what it is
229 230 231 232 233 234 235 |
# File 'lib/traject/solr_json_writer.rb', line 229 def determine_solr_update_url if settings['solr.update_url'] check_solr_update_url(settings['solr.update_url']) else derive_solr_update_url_from_solr_url(settings['solr.url']) end end |
#logger ⇒ Object
Get the logger from the settings, or default to an effectively null logger
165 166 167 |
# File 'lib/traject/solr_json_writer.rb', line 165 def logger settings["logger"] ||= Yell.new(STDERR, :level => "gt.fatal") # null logger end |
#put(context) ⇒ Object
Add a single context to the queue, ready to be sent to solr
99 100 101 102 103 104 105 106 107 |
# File 'lib/traject/solr_json_writer.rb', line 99 def put(context) @thread_pool.raise_collected_exception! @batched_queue << context if @batched_queue.size >= @batch_size batch = Traject::Util.drain_queue(@batched_queue) @thread_pool.maybe_in_thread_pool(batch) {|batch_arg| send_batch(batch_arg) } end end |
#send_batch(batch) ⇒ Object
Send the given batch of contexts. If something goes wrong, send them one at a time.
112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 |
# File 'lib/traject/solr_json_writer.rb', line 112 def send_batch(batch) return if batch.empty? json_package = JSON.generate(batch.map { |c| c.output_hash }) begin resp = @http_client.post @solr_update_url, json_package, "Content-type" => "application/json" rescue StandardError => exception end if exception || resp.status != 200 = exception ? Traject::Util.(exception) : "Solr response: #{resp.status}: #{resp.body}" logger.error "Error in Solr batch add. Will retry documents individually at performance penalty: #{}" batch.each do |c| send_single(c) end end end |
#send_single(c) ⇒ Object
Send a single context to Solr, logging an error if need be
136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 |
# File 'lib/traject/solr_json_writer.rb', line 136 def send_single(c) json_package = JSON.generate([c.output_hash]) begin resp = @http_client.post @solr_update_url, json_package, "Content-type" => "application/json" # Catch Timeouts and network errors as skipped records, but otherwise # allow unexpected errors to propagate up. rescue HTTPClient::TimeoutError, SocketError, Errno::ECONNREFUSED => exception end if exception || resp.status != 200 if exception msg = Traject::Util.(exception) else msg = "Solr error response: #{resp.status}: #{resp.body}" end logger.error "Could not add record #{c.source_record_id} at source file position #{c.position}: #{msg}" logger.debug(c.source_record.to_s) @skipped_record_incrementer.increment if @max_skipped and skipped_record_count > @max_skipped raise RuntimeError.new("#{self.class.name}: Exceeded maximum number of skipped records (#{@max_skipped}): aborting") end end end |
#skipped_record_count ⇒ Object
Return count of encountered skipped records. Most accurate to call it after #close, in which case it should include full count, even under async thread_pool.
223 224 225 |
# File 'lib/traject/solr_json_writer.rb', line 223 def skipped_record_count @skipped_record_incrementer.value end |