Class: Redwood::Index
Defined Under Namespace
Classes: LockError
Constant Summary collapse
- EACH_BY_DATE_NUM =
you should probably not call this on a block that doesn’t break rather quickly because the results can be very large.
100
- SAME_SUBJECT_DATE_LIMIT =
yield all messages in the thread containing ‘m’ by repeatedly querying the index. yields pairs of message ids and message-building lambdas, so that building an unwanted message can be skipped in the block if desired.
only two options, :limit and :skip_killed. if :skip_killed is true, stops loading any thread if a message with a :killed flag is found.
7
Instance Attribute Summary collapse
-
#index ⇒ Object
(also: #ferret)
readonly
Returns the value of attribute index.
Instance Method Summary collapse
- #add_source(source) ⇒ Object
-
#build_message(docid) ⇒ Object
builds a message object from a ferret result.
- #contains?(m) ⇒ Boolean
- #contains_id?(id) ⇒ Boolean
- #drop_entry(docno) ⇒ Object
- #each_id_by_date(opts = {}) ⇒ Object
- #each_message_in_thread_for(m, opts = {}) ⇒ Object
- #fancy_lock_error_message_for(e) ⇒ Object
- #fresh_thread_id ⇒ Object
- #has_any_from_source_with_label?(source, label) ⇒ Boolean
-
#initialize(dir = BASE_DIR) ⇒ Index
constructor
A new instance of Index.
- #load ⇒ Object
- #load_contacts(emails, h = {}) ⇒ Object
- #load_entry_for_id(mid) ⇒ Object
- #load_index(dir = File.join(@dir, "ferret")) ⇒ Object
- #load_sources(fn = Redwood::SOURCE_FN) ⇒ Object
- #lock ⇒ Object
- #lock_or_die ⇒ Object
- #lockfile ⇒ Object
- #num_results_for(opts = {}) ⇒ Object
- #save ⇒ Object
- #save_index(fn = File.join(@dir, "ferret")) ⇒ Object
- #size ⇒ Object
- #source_for(uri) ⇒ Object
- #sources ⇒ Object
- #start_lock_update_thread ⇒ Object
- #stop_lock_update_thread ⇒ Object
-
#sync_message(m, docid = nil, entry = nil) ⇒ Object
Syncs the message to the index: deleting if it’s already there, and adding either way.
- #unlock ⇒ Object
- #unwrap_subj(subj) ⇒ Object
- #usual_sources ⇒ Object
- #wrap_subj(subj) ⇒ Object
Constructor Details
#initialize(dir = BASE_DIR) ⇒ Index
Returns a new instance of Index.
28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 |
# File 'lib/sup/index.rb', line 28 def initialize dir=BASE_DIR @dir = dir @sources = {} @sources_dirty = false wsa = Ferret::Analysis::WhiteSpaceAnalyzer.new false sa = Ferret::Analysis::StandardAnalyzer.new [], true @analyzer = Ferret::Analysis::PerFieldAnalyzer.new wsa @analyzer[:body] = sa @analyzer[:subject] = sa @qparser ||= Ferret::QueryParser.new :default_field => :body, :analyzer => @analyzer, :or_default => false @lock = Lockfile.new lockfile, :retries => 0, :max_age => nil self.class.i_am_the_instance self end |
Instance Attribute Details
#index ⇒ Object (readonly) Also known as: ferret
Returns the value of attribute index.
26 27 28 |
# File 'lib/sup/index.rb', line 26 def index @index end |
Instance Method Details
#add_source(source) ⇒ Object
119 120 121 122 123 124 125 126 |
# File 'lib/sup/index.rb', line 119 def add_source source raise "duplicate source!" if @sources.include? source @sources_dirty = true max = @sources.max_of { |id, s| s.is_a?(DraftLoader) || s.is_a?(SentLoader) ? 0 : id } source.id ||= (max || 0) + 1 ##source.id += 1 while @sources.member? source.id @sources[source.id] = source end |
#build_message(docid) ⇒ Object
builds a message object from a ferret result
306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 |
# File 'lib/sup/index.rb', line 306 def docid doc = @index[docid] source = @sources[doc[:source_id].to_i] #puts "building message #{doc[:message_id]} (#{source}##{doc[:source_info]})" raise "invalid source #{doc[:source_id]}" unless source fake_header = { "date" => Time.at(doc[:date].to_i), "subject" => unwrap_subj(doc[:subject]), "from" => doc[:from], "to" => doc[:to], "message-id" => doc[:message_id], "references" => doc[:refs].split(/\s+/).map { |x| "<#{x}>" }.join(" "), } Message.new :source => source, :source_info => doc[:source_info].to_i, :labels => doc[:label].split(" ").map { |s| s.intern }, :snippet => doc[:snippet], :header => fake_header end |
#contains?(m) ⇒ Boolean
204 |
# File 'lib/sup/index.rb', line 204 def contains? m; contains_id? m.id; end |
#contains_id?(id) ⇒ Boolean
201 202 203 |
# File 'lib/sup/index.rb', line 201 def contains_id? id @index.search(Ferret::Search::TermQuery.new(:message_id, id)).total_hits > 0 end |
#drop_entry(docno) ⇒ Object
330 |
# File 'lib/sup/index.rb', line 330 def drop_entry docno; @index.delete docno; end |
#each_id_by_date(opts = {}) ⇒ Object
210 211 212 213 214 215 216 217 218 219 220 221 |
# File 'lib/sup/index.rb', line 210 def each_id_by_date opts={} return if @index.size == 0 # otherwise ferret barfs ###TODO: remove this once my ferret patch is accepted query = build_query opts offset = 0 while true results = @index.search(query, :sort => "date DESC", :limit => EACH_BY_DATE_NUM, :offset => offset) Redwood::log "got #{results.total_hits} results for query (offset #{offset}) #{query.inspect}" results.hits.each { |hit| yield @index[hit.doc][:message_id], lambda { hit.doc } } break if offset >= results.total_hits - EACH_BY_DATE_NUM offset += EACH_BY_DATE_NUM end end |
#each_message_in_thread_for(m, opts = {}) ⇒ Object
239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 |
# File 'lib/sup/index.rb', line 239 def m, opts={} #Redwood::log "Building thread for #{m.id}: #{m.subj}" = {} searched = {} num_queries = 0 if $config[:thread_by_subject] # do subject queries date_min = m.date - (SAME_SUBJECT_DATE_LIMIT * 12 * 3600) date_max = m.date + (SAME_SUBJECT_DATE_LIMIT * 12 * 3600) q = Ferret::Search::BooleanQuery.new true sq = Ferret::Search::PhraseQuery.new(:subject) wrap_subj(Message.normalize_subj(m.subj)).split(/\s+/).each do |t| sq.add_term t end q.add_query sq, :must q.add_query Ferret::Search::RangeQuery.new(:date, :>= => date_min.to_indexable_s, :<= => date_max.to_indexable_s), :must q = build_query :qobj => q pending = @index.search(q).hits.map { |hit| @index[hit.doc][:message_id] } Redwood::log "found #{pending.size} results for subject query #{q}" else pending = [m.id] end until pending.empty? || (opts[:limit] && .size >= opts[:limit]) q = Ferret::Search::BooleanQuery.new true pending.each do |id| searched[id] = true q.add_query Ferret::Search::TermQuery.new(:message_id, id), :should q.add_query Ferret::Search::TermQuery.new(:refs, id), :should end pending = [] q = build_query :qobj => q num_queries += 1 killed = false @index.search_each(q, :limit => :all) do |docid, score| break if opts[:limit] && .size >= opts[:limit] if @index[docid][:label].split(/\s+/).include?("killed") && opts[:skip_killed] killed = true break end mid = @index[docid][:message_id] unless .member?(mid) #Redwood::log "got #{mid} as a child of #{id}" [mid] ||= lambda { docid } refs = @index[docid][:refs].split(" ") pending += refs.select { |id| !searched[id] } end end end if killed Redwood::log "thread for #{m.id} is killed, ignoring" false else Redwood::log "ran #{num_queries} queries to build thread of #{.size + 1} messages for #{m.id}: #{m.subj}" if num_queries > 0 .each { |mid, builder| yield mid, builder } true end end |
#fancy_lock_error_message_for(e) ⇒ Object
69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 |
# File 'lib/sup/index.rb', line 69 def e secs = Time.now - e.mtime mins = secs.to_i / 60 time = if mins == 0 "#{secs.to_i} seconds" else "#{mins} minutes" end <<EOS Error: the sup index is locked by another process! User '#{e.user}' on host '#{e.host}' is running #{e.pname} with pid #{e.pid}. The process was alive as of #{time} ago. EOS end |
#fresh_thread_id ⇒ Object
326 |
# File 'lib/sup/index.rb', line 326 def fresh_thread_id; @next_thread_id += 1; end |
#has_any_from_source_with_label?(source, label) ⇒ Boolean
374 375 376 377 378 379 |
# File 'lib/sup/index.rb', line 374 def has_any_from_source_with_label? source, label q = Ferret::Search::BooleanQuery.new q.add_query Ferret::Search::TermQuery.new("source_id", source.id.to_s), :must q.add_query Ferret::Search::TermQuery.new("label", label.to_s), :must index.search(q, :limit => 1).total_hits > 0 end |
#load ⇒ Object
107 108 109 110 |
# File 'lib/sup/index.rb', line 107 def load load_sources load_index end |
#load_contacts(emails, h = {}) ⇒ Object
339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 |
# File 'lib/sup/index.rb', line 339 def load_contacts emails, h={} q = Ferret::Search::BooleanQuery.new true emails.each do |e| qq = Ferret::Search::BooleanQuery.new true qq.add_query Ferret::Search::TermQuery.new(:from, e), :should qq.add_query Ferret::Search::TermQuery.new(:to, e), :should q.add_query qq end q.add_query Ferret::Search::TermQuery.new(:label, "spam"), :must_not Redwood::log "contact search: #{q}" contacts = {} num = h[:num] || 20 @index.search_each(q, :sort => "date DESC", :limit => :all) do |docid, score| break if contacts.size >= num #Redwood::log "got message #{docid} to: #{@index[docid][:to].inspect} and from: #{@index[docid][:from].inspect}" f = @index[docid][:from] t = @index[docid][:to] if AccountManager.is_account_email? f t.split(" ").each { |e| contacts[PersonManager.person_for(e)] = true } else contacts[PersonManager.person_for(f)] = true end end contacts.keys.compact end |
#load_entry_for_id(mid) ⇒ Object
332 333 334 335 336 337 |
# File 'lib/sup/index.rb', line 332 def load_entry_for_id mid results = @index.search(Ferret::Search::TermQuery.new(:message_id, mid)) return if results.total_hits == 0 docid = results.hits[0].doc [docid, @index[docid]] end |
#load_index(dir = File.join(@dir, "ferret")) ⇒ Object
132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 |
# File 'lib/sup/index.rb', line 132 def load_index dir=File.join(@dir, "ferret") if File.exists? dir Redwood::log "loading index..." @index = Ferret::Index::Index.new(:path => dir, :analyzer => @analyzer) Redwood::log "loaded index of #{@index.size} messages" else Redwood::log "creating index..." field_infos = Ferret::Index::FieldInfos.new :store => :yes field_infos.add_field :message_id field_infos.add_field :source_id field_infos.add_field :source_info field_infos.add_field :date, :index => :untokenized field_infos.add_field :body, :store => :no field_infos.add_field :label field_infos.add_field :subject field_infos.add_field :from field_infos.add_field :to field_infos.add_field :refs field_infos.add_field :snippet, :index => :no, :term_vector => :no field_infos.create_index dir @index = Ferret::Index::Index.new(:path => dir, :analyzer => @analyzer) end end |
#load_sources(fn = Redwood::SOURCE_FN) ⇒ Object
368 369 370 371 372 |
# File 'lib/sup/index.rb', line 368 def load_sources fn=Redwood::SOURCE_FN source_array = (Redwood::load_yaml_obj(fn) || []).map { |o| Recoverable.new o } @sources = Hash[*(source_array).map { |s| [s.id, s] }.flatten] @sources_dirty = false end |
#lock ⇒ Object
46 47 48 49 50 51 52 53 |
# File 'lib/sup/index.rb', line 46 def lock Redwood::log "locking #{lockfile}..." begin @lock.lock rescue Lockfile::MaxTriesLockError raise LockError, @lock.lockinfo_on_disk end end |
#lock_or_die ⇒ Object
86 87 88 89 90 91 92 93 94 95 96 97 98 |
# File 'lib/sup/index.rb', line 86 def lock_or_die begin lock rescue LockError => e $stderr.puts (e) $stderr.puts <<EOS You can wait for the process to finish, or, if it crashed and left a stale lock file behind, you can manually delete #{@lock.path}. EOS exit end end |
#lockfile ⇒ Object
44 |
# File 'lib/sup/index.rb', line 44 def lockfile; File.join @dir, "lock" end |
#num_results_for(opts = {}) ⇒ Object
223 224 225 226 227 228 |
# File 'lib/sup/index.rb', line 223 def num_results_for opts={} return 0 if @index.size == 0 # otherwise ferret barfs ###TODO: remove this once my ferret patch is accepted q = build_query opts index.search(q, :limit => 1).total_hits end |
#save ⇒ Object
112 113 114 115 116 117 |
# File 'lib/sup/index.rb', line 112 def save Redwood::log "saving index and sources..." FileUtils.mkdir_p @dir unless File.exists? @dir save_sources save_index end |
#save_index(fn = File.join(@dir, "ferret")) ⇒ Object
197 198 199 |
# File 'lib/sup/index.rb', line 197 def save_index fn=File.join(@dir, "ferret") # don't have to do anything, apparently end |
#size ⇒ Object
205 |
# File 'lib/sup/index.rb', line 205 def size; @index.size; end |
#source_for(uri) ⇒ Object
128 |
# File 'lib/sup/index.rb', line 128 def source_for uri; @sources.values.find { |s| s.is_source_for? uri }; end |
#sources ⇒ Object
130 |
# File 'lib/sup/index.rb', line 130 def sources; @sources.values; end |
#start_lock_update_thread ⇒ Object
55 56 57 58 59 60 61 62 |
# File 'lib/sup/index.rb', line 55 def start_lock_update_thread @lock_update_thread = Redwood::reporting_thread("lock update") do while true sleep 30 @lock.touch_yourself end end end |
#stop_lock_update_thread ⇒ Object
64 65 66 67 |
# File 'lib/sup/index.rb', line 64 def stop_lock_update_thread @lock_update_thread.kill if @lock_update_thread @lock_update_thread = nil end |
#sync_message(m, docid = nil, entry = nil) ⇒ Object
Syncs the message to the index: deleting if it’s already there, and adding either way. Index state will be determined by m.labels.
docid and entry can be specified if they’re already known.
160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 |
# File 'lib/sup/index.rb', line 160 def m, docid=nil, entry=nil docid, entry = load_entry_for_id m.id unless docid && entry raise "no source info for message #{m.id}" unless m.source && m.source_info raise "trying to delete non-corresponding entry #{docid} with index message-id #{@index[docid][:message_id].inspect} and parameter message id #{m.id.inspect}" if docid && @index[docid][:message_id] != m.id source_id = if m.source.is_a? Integer m.source else m.source.id or raise "unregistered source #{m.source} (id #{m.source.id.inspect})" end to = (m.to + m.cc + m.bcc).map { |x| x.email }.join(" ") d = { :message_id => m.id, :source_id => source_id, :source_info => m.source_info, :date => m.date.to_indexable_s, :body => m.content, :snippet => m.snippet, :label => m.labels.uniq.join(" "), :from => m.from ? m.from.email : "", :to => (m.to + m.cc + m.bcc).map { |x| x.email }.join(" "), :subject => wrap_subj(Message.normalize_subj(m.subj)), :refs => (m.refs + m.replytos).uniq.join(" "), } @index.delete docid if docid @index.add_document d docid, entry = load_entry_for_id m.id ## this hasn't been triggered in a long time. TODO: decide whether it's still a problem. raise "just added message #{m.id.inspect} but couldn't find it in a search" unless docid true end |
#unlock ⇒ Object
100 101 102 103 104 105 |
# File 'lib/sup/index.rb', line 100 def unlock if @lock && @lock.locked? Redwood::log "unlocking #{lockfile}..." @lock.unlock end end |
#unwrap_subj(subj) ⇒ Object
328 |
# File 'lib/sup/index.rb', line 328 def unwrap_subj subj; subj =~ /__START_SUBJECT__ (.*?) __END_SUBJECT__/ && $1; end |
#usual_sources ⇒ Object
129 |
# File 'lib/sup/index.rb', line 129 def usual_sources; @sources.values.find_all { |s| s.usual? }; end |
#wrap_subj(subj) ⇒ Object
327 |
# File 'lib/sup/index.rb', line 327 def wrap_subj subj; "__START_SUBJECT__ #{subj} __END_SUBJECT__"; end |