Class: Traject::SolrJWriter
- Inherits:
-
Object
- Object
- Traject::SolrJWriter
- Includes:
- QualifiedConstGet
- Defined in:
- lib/traject/solrj_writer.rb
Overview
Writes to a Solr using SolrJ, and the SolrJ HttpSolrServer.
(sub-class later for the ConcurrentUpdate server?)
After you call #close, you can check #skipped_record_count if you want for an integer count of skipped records.
For fatal errors that raise… async processing with thread_pool means that you may not get a raise immediately after calling #put, you may get it on a FUTURE #put or #close. You should get it eventually though.
settings:
[solr.url] Your solr url (required)
[solrj_writer.server_class_name] Defaults to "HttpSolrServer". You can specify
another Solr Server sub-class, but it has
to take a one-arg url constructor. Maybe
subclass this writer class and overwrite
instantiate_solr_server! otherwise
[solrj.jar_dir] Custom directory containing all of the SolrJ jars. All
jars in this dir will be loaded. Otherwise,
we load our own packaged solrj jars. This setting
can't really be used differently in the same app instance,
since jars are loaded globally.
[solrj_writer.parser_class_name] A String name of a class in package
org.apache.solr.client.solrj.impl,
we'll instantiate one with a zero-arg
constructor, and pass it as an arg to setParser on
the SolrServer instance, if present.
NOTE: For contacting a Solr 1.x server, with the
recent version of SolrJ used by default, set to
"XMLResponseParser"
[solrj_writer.commit_on_close] If true (or string 'true'), send a commit to solr
at end of #process.
[solrj_writer.batch_size] If non-nil and more than 1, send documents to
solr in batches of solrj_writer.batch_size. If nil/1,
however, an http transaction with solr will be done
per doc. DEFAULT to 100, which seems to be a sweet spot.
[solrj_writer.thread_pool] Defaults to 4. A thread pool is used for submitting docs
to solr. Set to 0 or nil to disable threading. Set to 1,
there will still be a single bg thread doing the adds.
May make sense to set higher than number of cores on your
indexing machine, as these threads will mostly be waiting
on Solr. Speed/capacity of your solr is more relevant.
Defined Under Namespace
Classes: UpdatePackage
Instance Attribute Summary collapse
-
#batched_queue ⇒ Object
readonly
Returns the value of attribute batched_queue.
-
#settings ⇒ Object
readonly
Returns the value of attribute settings.
- #solr_server ⇒ Object
Instance Method Summary collapse
-
#add_one_document_package(package) ⇒ Object
Adds a single SolrInputDocument passed in as an UpdatePackage combo of SolrInputDocument and context.
-
#batch_add_document_packages(current_batch) ⇒ Object
Takes array and batch adds it to solr – array of UpdatePackage tuples of SolrInputDocument and context.
- #close ⇒ Object
-
#ensure_solrj_loaded! ⇒ Object
Loads solrj if not already loaded.
-
#fatal_exception?(e) ⇒ Boolean
If an exception is encountered talking to Solr, is it one we should entirely give up on? SolrJ doesn’t use a useful exception class hieararchy, we have to look into it’s details and guess.
- #hash_to_solr_document(hash) ⇒ Object
-
#initialize(argSettings) ⇒ SolrJWriter
constructor
A new instance of SolrJWriter.
- #instantiate_solr_server! ⇒ Object
- #logger ⇒ Object
-
#put(context) ⇒ Object
Method IS thread-safe, can be called concurrently by multi-threads.
- #settings_check!(settings) ⇒ Object
-
#skipped_record_count ⇒ Object
Return count of encountered skipped records.
Methods included from QualifiedConstGet
Constructor Details
#initialize(argSettings) ⇒ SolrJWriter
Returns a new instance of SolrJWriter.
87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 |
# File 'lib/traject/solrj_writer.rb', line 87 def initialize(argSettings) @settings = Traject::Indexer::Settings.new(argSettings) settings_check!(settings) ensure_solrj_loaded! solr_server # init @batched_queue = java.util.concurrent.LinkedBlockingQueue.new # when multi-threaded exceptions raised in threads are held here # we need a HIGH performance queue here to try and avoid slowing things down, # since we need to check it frequently. @async_exception_queue = java.util.concurrent.ConcurrentLinkedQueue.new # Store error count in an AtomicInteger, so multi threads can increment # it safely, if we're threaded. @skipped_record_incrementer = java.util.concurrent.atomic.AtomicInteger.new(0) # if our thread pool settings are 0, it'll just create a null threadpool that # executes in calling context. @thread_pool = Traject::ThreadPool.new( @settings["solrj_writer.thread_pool"].to_i ) @debug_ascii_progress = (@settings["debug_ascii_progress"].to_s == "true") end |
Instance Attribute Details
#batched_queue ⇒ Object (readonly)
Returns the value of attribute batched_queue.
85 86 87 |
# File 'lib/traject/solrj_writer.rb', line 85 def batched_queue @batched_queue end |
#settings ⇒ Object (readonly)
Returns the value of attribute settings.
83 84 85 |
# File 'lib/traject/solrj_writer.rb', line 83 def settings @settings end |
#solr_server ⇒ Object
310 311 312 |
# File 'lib/traject/solrj_writer.rb', line 310 def solr_server @solr_server ||= instantiate_solr_server! end |
Instance Method Details
#add_one_document_package(package) ⇒ Object
Adds a single SolrInputDocument passed in as an UpdatePackage combo of SolrInputDocument and context.
Rescues exceptions thrown by SolrServer.add, logs them, and then raises them again if deemed fatal and should stop indexing. Only intended to be used on a SINGLE document add. If we get an exception on a multi-doc batch add, we need to recover differently.
216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 |
# File 'lib/traject/solrj_writer.rb', line 216 def add_one_document_package(package) begin solr_server.add(package.solr_document) # Honestly not sure what the difference is between those types, but SolrJ raises both rescue org.apache.solr.common.SolrException, org.apache.solr.client.solrj.SolrServerException => e id = package.context.source_record && package.context.source_record['001'] && package.context.source_record['001'].value id_str = id ? "001:#{id}" : "" position = package.context.position position_str = position ? "at file position #{position} (starting at 1)" : "" logger.error("Could not index record #{id_str} #{position_str}\n" + Traject::Util.(e) ) logger.debug(package.context.source_record.to_s) @skipped_record_incrementer.getAndIncrement() # AtomicInteger, thread-safe increment. if fatal_exception? e logger.fatal ("SolrJ exception judged fatal, raising...") raise e end end end |
#batch_add_document_packages(current_batch) ⇒ Object
Takes array and batch adds it to solr – array of UpdatePackage tuples of SolrInputDocument and context.
Catches error in batch add, logs, and re-tries docs individually
Is thread-safe, because SolrServer is thread-safe, and we aren’t referencing any other shared state. Important that CALLER passes in a doc array that is not shared state, extracting it from shared state batched_queue in a mutex.
191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 |
# File 'lib/traject/solrj_writer.rb', line 191 def batch_add_document_packages(current_batch) begin a = current_batch.collect {|package| package.solr_document } solr_server.add( a ) $stderr.write "%" if @debug_ascii_progress rescue Exception => e # Error in batch, none of the docs got added, let's try to re-add # em all individually, so those that CAN get added get added, and those # that can't get individually logged. logger.warn "Error encountered in batch solr add, will re-try documents individually, at a performance penalty...\n" + Traject::Util.(e) current_batch.each do |package| add_one_document_package(package) end end end |
#close ⇒ Object
266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 |
# File 'lib/traject/solrj_writer.rb', line 266 def close @thread_pool.raise_collected_exception! # Any leftovers in batch buffer? Send em to the threadpool too. if batched_queue.length > 0 packages = [] batched_queue.drain_to(packages) # we do it in the thread pool for consistency, and so # it goes to the end of the queue behind any outstanding # work in the pool. @thread_pool.maybe_in_thread_pool { batch_add_document_packages( packages ) } end # Wait for shutdown, and time it. logger.debug "SolrJWriter: Shutting down thread pool, waiting if needed..." elapsed = @thread_pool.shutdown_and_wait if elapsed > 60 logger.warn "Waited #{elapsed} seconds for all SolrJWriter threads, you may want to increase solrj_writer.thread_pool (currently #{@settings["solrj_writer.thread_pool"]})" end logger.debug "SolrJWriter: Thread pool shutdown complete" logger.warn "SolrJWriter: #{skipped_record_count} skipped records" if skipped_record_count > 0 # check again now that we've waited, there could still be some # that didn't show up before. @thread_pool.raise_collected_exception! if settings["solrj_writer.commit_on_close"].to_s == "true" logger.info "SolrJWriter: Sending commit to solr..." solr_server.commit end solr_server.shutdown @solr_server = nil end |
#ensure_solrj_loaded! ⇒ Object
Loads solrj if not already loaded. By loading all jars found in settings
115 116 117 118 119 120 121 122 |
# File 'lib/traject/solrj_writer.rb', line 115 def ensure_solrj_loaded! unless defined?(HttpSolrServer) && defined?(SolrInputDocument) Traject::Util.require_solrj_jars(settings) end # And for now, SILENCE SolrJ logging org.apache.log4j.Logger.getRootLogger().addAppender(org.apache.log4j.varia.NullAppender.new) end |
#fatal_exception?(e) ⇒ Boolean
If an exception is encountered talking to Solr, is it one we should entirely give up on? SolrJ doesn’t use a useful exception class hieararchy, we have to look into it’s details and guess.
246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 |
# File 'lib/traject/solrj_writer.rb', line 246 def fatal_exception?(e) root_cause = e.respond_to?(:getRootCause) && e.getRootCause # Various kinds of inability to actually talk to the # server look like this: if root_cause.kind_of? java.io.IOException return true end # Consider Solr server returning HTTP 500 Internal Server Error to be fatal. # This can mean, for instance, that disk space is exhausted on solr server. if e.kind_of?(Java::OrgApacheSolrCommon::SolrException) && e.code == 500 return true end return false end |
#hash_to_solr_document(hash) ⇒ Object
172 173 174 175 176 177 178 179 180 |
# File 'lib/traject/solrj_writer.rb', line 172 def hash_to_solr_document(hash) doc = SolrInputDocument.new hash.each_pair do |key, value_array| value_array.each do |value| doc.addField( key, value ) end end return doc end |
#instantiate_solr_server! ⇒ Object
317 318 319 320 321 322 323 324 325 326 327 328 |
# File 'lib/traject/solrj_writer.rb', line 317 def instantiate_solr_server! server_class = qualified_const_get( settings["solrj_writer.server_class_name"] || "HttpSolrServer" ) server = server_class.new( settings["solr.url"].to_s ); if parser_name = settings["solrj_writer.parser_class_name"] #parser = org.apache.solr.client.solrj.impl.const_get(parser_name).new parser = Java::JavaClass.for_name("org.apache.solr.client.solrj.impl.#{parser_name}").ruby_class.new server.setParser( parser ) end server end |
#logger ⇒ Object
239 240 241 |
# File 'lib/traject/solrj_writer.rb', line 239 def logger settings["logger"] ||= Yell.new(STDERR, :level => "gt.fatal") # null logger end |
#put(context) ⇒ Object
Method IS thread-safe, can be called concurrently by multi-threads.
Why? If not using batched add, we just use the SolrServer, which is already thread safe itself.
If we are using batch add, we surround all access to our shared state batch queue in a mutex – just a naive implementation. May be able to improve performance with more sophisticated java.util.concurrent data structure (blocking queue etc) I did try a java ArrayBlockingQueue or LinkedBlockingQueue instead of our own mutex – I did not see consistently different performance. May want to change so doesn’t use a mutex at all if multiple mapping threads aren’t being used.
this class does not at present use any threads itself, all work will be done in the calling thread, including actual http transactions to solr via solrj SolrServer if using batches, then not every #put is a http transaction, but when it is, it’s in the calling thread, synchronously.
141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 |
# File 'lib/traject/solrj_writer.rb', line 141 def put(context) @thread_pool.raise_collected_exception! # package the SolrInputDocument along with the context, so we have # the context for error reporting when we actually add. package = UpdatePackage.new(hash_to_solr_document(context.output_hash), context) if settings["solrj_writer.batch_size"].to_i > 1 ready_batch = [] batched_queue.add(package) if batched_queue.size >= settings["solrj_writer.batch_size"].to_i batched_queue.drain_to(ready_batch) end if ready_batch.length > 0 if @debug_ascii_progress $stderr.write("^") if @thread_pool.queue && (@thread_pool.queue.size >= @thread_pool.queue_capacity) $stderr.write "!" end end @thread_pool.maybe_in_thread_pool { batch_add_document_packages(ready_batch) } end else # non-batched add, add one at a time. @thread_pool.maybe_in_thread_pool { add_one_document_package(package) } end end |
#settings_check!(settings) ⇒ Object
330 331 332 333 334 335 336 337 338 |
# File 'lib/traject/solrj_writer.rb', line 330 def settings_check!(settings) unless settings.has_key?("solr.url") && ! settings["solr.url"].nil? raise ArgumentError.new("SolrJWriter requires a 'solr.url' solr url in settings") end unless settings["solr.url"] =~ /^#{URI::regexp}$/ raise ArgumentError.new("SolrJWriter requires a 'solr.url' setting that looks like a URL, not: `#{settings['solr.url']}`") end end |
#skipped_record_count ⇒ Object
Return count of encountered skipped records. Most accurate to call it after #close, in which case it should include full count, even under async thread_pool.
305 306 307 |
# File 'lib/traject/solrj_writer.rb', line 305 def skipped_record_count @skipped_record_incrementer.get end |