Class: Traject::SolrJWriter
- Inherits:
-
Object
- Object
- Traject::SolrJWriter
- Includes:
- QualifiedConstGet
- Defined in:
- lib/traject/solrj_writer.rb
Overview
Writes to a Solr using SolrJ, and the SolrJ HttpSolrServer.
After you call #close, you can check #skipped_record_count if you want for an integer count of skipped records.
For fatal errors that raise... async processing with thread_pool means that you may not get a raise immediately after calling #put, you may get it on a FUTURE #put or #close. You should get it eventually though.
Settings
solr.url: Your solr url (required)
solrj_writer.server_class_name: Defaults to "HttpSolrServer". You can specify another Solr Server sub-class, but it has to take a one-arg url constructor. Maybe subclass this writer class and overwrite instantiate_solr_server! otherwise
solrj.jar_dir: Custom directory containing all of the SolrJ jars. All jars in this dir will be loaded. Otherwise, we load our own packaged solrj jars. This setting can't really be used differently in the same app instance, since jars are loaded globally.
solrj_writer.parser_class_name: A String name of a class in package org.apache.solr.client.solrj.impl, we'll instantiate one with a zero-arg constructor, and pass it as an arg to setParser on the SolrServer instance, if present. NOTE: For contacting a Solr 1.x server, with the recent version of SolrJ used by default, set to "XMLResponseParser"
solrj_writer.commit_on_close: If true (or string 'true'), send a commit to solr at end of #process.
solrj_writer.batch_size: If non-nil and more than 1, send documents to solr in batches of solrj_writer.batch_size. If nil/1, however, an http transaction with solr will be done per doc. DEFAULT to 100, which seems to be a sweet spot.
solrj_writer.thread_pool: Defaults to 1. A thread pool is used for submitting docs to solr. Set to 0 or nil to disable threading. Set to 1, there will still be a single bg thread doing the adds. For very fast Solr servers and very fast indexing processes, may make sense to increase this value to throw at Solr as fast as it can catch.
Example
settings do
provide "writer_class_name", "Traject::SolrJWriter"
# This is just regular ruby, so don't be afraid to have conditionals!
# Switch on hostname, for test and production server differences
if Socket.gethostname =~ /devhost/
provide "solr.url", "http://my.dev.machine:9033/catalog"
else
provide "solr.url", "http://my.production.machine:9033/catalog"
end
provide "solrj_writer.parser_class_name", "BinaryResponseParser" # for Solr 4.x
# provide "solrj_writer.parser_class_name", "XMLResponseParser" # For solr 1.x or 3.x
provide "solrj_writer.commit_on_close", "true"
end
Defined Under Namespace
Classes: UpdatePackage
Instance Attribute Summary collapse
-
#batched_queue ⇒ Object
readonly
Returns the value of attribute batched_queue.
-
#settings ⇒ Object
readonly
Returns the value of attribute settings.
- #solr_server ⇒ Object
Instance Method Summary collapse
-
#add_one_document_package(package) ⇒ Object
Adds a single SolrInputDocument passed in as an UpdatePackage combo of SolrInputDocument and context.
-
#batch_add_document_packages(current_batch) ⇒ Object
Takes array and batch adds it to solr -- array of UpdatePackage tuples of SolrInputDocument and context.
- #close ⇒ Object
-
#ensure_solrj_loaded! ⇒ Object
Loads solrj if not already loaded.
-
#fatal_exception?(e) ⇒ Boolean
If an exception is encountered talking to Solr, is it one we should entirely give up on? SolrJ doesn't use a useful exception class hieararchy, we have to look into it's details and guess.
- #hash_to_solr_document(hash) ⇒ Object
-
#initialize(argSettings) ⇒ SolrJWriter
constructor
A new instance of SolrJWriter.
-
#instantiate_solr_server! ⇒ Object
Instantiates a solr server of class settings["solrj_writer.server_class_name"] or "HttpSolrServer" and initializes it with settings["solr.url"].
- #logger ⇒ Object
-
#put(context) ⇒ Object
Method IS thread-safe, can be called concurrently by multi-threads.
- #settings_check!(settings) ⇒ Object
-
#skipped_record_count ⇒ Object
Return count of encountered skipped records.
Methods included from QualifiedConstGet
Constructor Details
#initialize(argSettings) ⇒ SolrJWriter
Returns a new instance of SolrJWriter.
96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 |
# File 'lib/traject/solrj_writer.rb', line 96 def initialize(argSettings) @settings = Traject::Indexer::Settings.new(argSettings) settings_check!(settings) ensure_solrj_loaded! solr_server # init @batched_queue = java.util.concurrent.LinkedBlockingQueue.new # when multi-threaded exceptions raised in threads are held here # we need a HIGH performance queue here to try and avoid slowing things down, # since we need to check it frequently. @async_exception_queue = java.util.concurrent.ConcurrentLinkedQueue.new # Store error count in an AtomicInteger, so multi threads can increment # it safely, if we're threaded. @skipped_record_incrementer = java.util.concurrent.atomic.AtomicInteger.new(0) # if our thread pool settings are 0, it'll just create a null threadpool that # executes in calling context. @thread_pool = Traject::ThreadPool.new( @settings["solrj_writer.thread_pool"].to_i ) @debug_ascii_progress = (@settings["debug_ascii_progress"].to_s == "true") logger.info(" #{self.class.name} writing to '#{settings['solr.url']}'") end |
Instance Attribute Details
#batched_queue ⇒ Object (readonly)
Returns the value of attribute batched_queue.
94 95 96 |
# File 'lib/traject/solrj_writer.rb', line 94 def batched_queue @batched_queue end |
#settings ⇒ Object (readonly)
Returns the value of attribute settings.
92 93 94 |
# File 'lib/traject/solrj_writer.rb', line 92 def settings @settings end |
#solr_server ⇒ Object
321 322 323 |
# File 'lib/traject/solrj_writer.rb', line 321 def solr_server @solr_server ||= instantiate_solr_server! end |
Instance Method Details
#add_one_document_package(package) ⇒ Object
Adds a single SolrInputDocument passed in as an UpdatePackage combo of SolrInputDocument and context.
Rescues exceptions thrown by SolrServer.add, logs them, and then raises them again if deemed fatal and should stop indexing. Only intended to be used on a SINGLE document add. If we get an exception on a multi-doc batch add, we need to recover differently.
227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 |
# File 'lib/traject/solrj_writer.rb', line 227 def add_one_document_package(package) begin solr_server.add(package.solr_document) # Honestly not sure what the difference is between those types, but SolrJ raises both rescue org.apache.solr.common.SolrException, org.apache.solr.client.solrj.SolrServerException => e id = package.context.source_record && package.context.source_record['001'] && package.context.source_record['001'].value id_str = id ? "001:#{id}" : "" position = package.context.position position_str = position ? "at file position #{position} (starting at 1)" : "" logger.error("Could not index record #{id_str} #{position_str}\n" + Traject::Util.(e) ) logger.debug(package.context.source_record.to_s) @skipped_record_incrementer.getAndIncrement() # AtomicInteger, thread-safe increment. if fatal_exception? e logger.fatal ("SolrJ exception judged fatal, raising...") raise e end end end |
#batch_add_document_packages(current_batch) ⇒ Object
Takes array and batch adds it to solr -- array of UpdatePackage tuples of SolrInputDocument and context.
Catches error in batch add, logs, and re-tries docs individually
Is thread-safe, because SolrServer is thread-safe, and we aren't referencing any other shared state. Important that CALLER passes in a doc array that is not shared state, extracting it from shared state batched_queue in a mutex.
202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 |
# File 'lib/traject/solrj_writer.rb', line 202 def batch_add_document_packages(current_batch) begin a = current_batch.collect {|package| package.solr_document } solr_server.add( a ) $stderr.write "%" if @debug_ascii_progress rescue Exception => e # Error in batch, none of the docs got added, let's try to re-add # em all individually, so those that CAN get added get added, and those # that can't get individually logged. logger.warn "Error encountered in batch solr add, will re-try documents individually, at a performance penalty...\n" + Traject::Util.(e) current_batch.each do |package| add_one_document_package(package) end end end |
#close ⇒ Object
277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 |
# File 'lib/traject/solrj_writer.rb', line 277 def close @thread_pool.raise_collected_exception! # Any leftovers in batch buffer? Send em to the threadpool too. if batched_queue.length > 0 packages = [] batched_queue.drain_to(packages) # we do it in the thread pool for consistency, and so # it goes to the end of the queue behind any outstanding # work in the pool. @thread_pool.maybe_in_thread_pool { batch_add_document_packages( packages ) } end # Wait for shutdown, and time it. logger.debug "SolrJWriter: Shutting down thread pool, waiting if needed..." elapsed = @thread_pool.shutdown_and_wait if elapsed > 60 logger.warn "Waited #{elapsed} seconds for all SolrJWriter threads, you may want to increase solrj_writer.thread_pool (currently #{@settings["solrj_writer.thread_pool"]})" end logger.debug "SolrJWriter: Thread pool shutdown complete" logger.warn "SolrJWriter: #{skipped_record_count} skipped records" if skipped_record_count > 0 # check again now that we've waited, there could still be some # that didn't show up before. @thread_pool.raise_collected_exception! if settings["solrj_writer.commit_on_close"].to_s == "true" logger.info "SolrJWriter: Sending commit to solr..." solr_server.commit end solr_server.shutdown @solr_server = nil end |
#ensure_solrj_loaded! ⇒ Object
Loads solrj if not already loaded. By loading all jars found in settings["solrj.jar_dir"]
126 127 128 129 130 131 132 133 |
# File 'lib/traject/solrj_writer.rb', line 126 def ensure_solrj_loaded! unless defined?(HttpSolrServer) && defined?(SolrInputDocument) Traject::Util.require_solrj_jars(settings) end # And for now, SILENCE SolrJ logging org.apache.log4j.Logger.getRootLogger().addAppender(org.apache.log4j.varia.NullAppender.new) end |
#fatal_exception?(e) ⇒ Boolean
If an exception is encountered talking to Solr, is it one we should entirely give up on? SolrJ doesn't use a useful exception class hieararchy, we have to look into it's details and guess.
257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 |
# File 'lib/traject/solrj_writer.rb', line 257 def fatal_exception?(e) root_cause = e.respond_to?(:getRootCause) && e.getRootCause # Various kinds of inability to actually talk to the # server look like this: if root_cause.kind_of? java.io.IOException return true end # Consider Solr server returning HTTP 500 Internal Server Error to be fatal. # This can mean, for instance, that disk space is exhausted on solr server. if e.kind_of?(Java::OrgApacheSolrCommon::SolrException) && e.code == 500 return true end return false end |
#hash_to_solr_document(hash) ⇒ Object
183 184 185 186 187 188 189 190 191 |
# File 'lib/traject/solrj_writer.rb', line 183 def hash_to_solr_document(hash) doc = SolrInputDocument.new hash.each_pair do |key, value_array| value_array.each do |value| doc.addField( key, value ) end end return doc end |
#instantiate_solr_server! ⇒ Object
Instantiates a solr server of class settings["solrj_writer.server_class_name"] or "HttpSolrServer" and initializes it with settings["solr.url"]
328 329 330 331 332 333 334 335 336 337 338 339 |
# File 'lib/traject/solrj_writer.rb', line 328 def instantiate_solr_server! server_class = qualified_const_get( settings["solrj_writer.server_class_name"] || "HttpSolrServer" ) server = server_class.new( settings["solr.url"].to_s ); if parser_name = settings["solrj_writer.parser_class_name"] #parser = org.apache.solr.client.solrj.impl.const_get(parser_name).new parser = Java::JavaClass.for_name("org.apache.solr.client.solrj.impl.#{parser_name}").ruby_class.new server.setParser( parser ) end server end |
#logger ⇒ Object
250 251 252 |
# File 'lib/traject/solrj_writer.rb', line 250 def logger settings["logger"] ||= Yell.new(STDERR, :level => "gt.fatal") # null logger end |
#put(context) ⇒ Object
Method IS thread-safe, can be called concurrently by multi-threads.
Why? If not using batched add, we just use the SolrServer, which is already thread safe itself.
If we are using batch add, we surround all access to our shared state batch queue in a mutex -- just a naive implementation. May be able to improve performance with more sophisticated java.util.concurrent data structure (blocking queue etc) I did try a java ArrayBlockingQueue or LinkedBlockingQueue instead of our own mutex -- I did not see consistently different performance. May want to change so doesn't use a mutex at all if multiple mapping threads aren't being used.
this class does not at present use any threads itself, all work will be done in the calling thread, including actual http transactions to solr via solrj SolrServer if using batches, then not every #put is a http transaction, but when it is, it's in the calling thread, synchronously.
152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 |
# File 'lib/traject/solrj_writer.rb', line 152 def put(context) @thread_pool.raise_collected_exception! # package the SolrInputDocument along with the context, so we have # the context for error reporting when we actually add. package = UpdatePackage.new(hash_to_solr_document(context.output_hash), context) if settings["solrj_writer.batch_size"].to_i > 1 ready_batch = [] batched_queue.add(package) if batched_queue.size >= settings["solrj_writer.batch_size"].to_i batched_queue.drain_to(ready_batch) end if ready_batch.length > 0 if @debug_ascii_progress $stderr.write("^") if @thread_pool.queue && (@thread_pool.queue.size >= @thread_pool.queue_capacity) $stderr.write "!" end end @thread_pool.maybe_in_thread_pool { batch_add_document_packages(ready_batch) } end else # non-batched add, add one at a time. @thread_pool.maybe_in_thread_pool { add_one_document_package(package) } end end |
#settings_check!(settings) ⇒ Object
341 342 343 344 345 346 347 348 349 |
# File 'lib/traject/solrj_writer.rb', line 341 def settings_check!(settings) unless settings.has_key?("solr.url") && ! settings["solr.url"].nil? raise ArgumentError.new("SolrJWriter requires a 'solr.url' solr url in settings") end unless settings["solr.url"] =~ /^#{URI::regexp}$/ raise ArgumentError.new("SolrJWriter requires a 'solr.url' setting that looks like a URL, not: `#{settings['solr.url']}`") end end |
#skipped_record_count ⇒ Object
Return count of encountered skipped records. Most accurate to call it after #close, in which case it should include full count, even under async thread_pool.
316 317 318 |
# File 'lib/traject/solrj_writer.rb', line 316 def skipped_record_count @skipped_record_incrementer.get end |