Class: Redwood::Index

Inherits:
Object show all
Includes:
Singleton
Defined in:
lib/sup/index.rb

Defined Under Namespace

Classes: LockError

Constant Summary

EACH_BY_DATE_NUM =

you should probably not call this on a block that doesn't break rather quickly because the results can be very large.

100
SAME_SUBJECT_DATE_LIMIT =

yield all messages in the thread containing 'm' by repeatedly querying the index. yields pairs of message ids and message-building lambdas, so that building an unwanted message can be skipped in the block if desired.

only two options, :limit and :skip_killed. if :skip_killed is true, stops loading any thread if a message with a :killed flag is found.

7
MAX_CLAUSES =
1000

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(dir = BASE_DIR) ⇒ Index



33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# File 'lib/sup/index.rb', line 33

def initialize dir=BASE_DIR
  @index_mutex = Monitor.new

  @dir = dir
  @sources = {}
  @sources_dirty = false
  @source_mutex = Monitor.new

  wsa = Ferret::Analysis::WhiteSpaceAnalyzer.new false
  sa = Ferret::Analysis::StandardAnalyzer.new [], true
  @analyzer = Ferret::Analysis::PerFieldAnalyzer.new wsa
  @analyzer[:body] = sa
  @analyzer[:subject] = sa
  @qparser ||= Ferret::QueryParser.new :default_field => :body, :analyzer => @analyzer, :or_default => false
  @lock = Lockfile.new lockfile, :retries => 0, :max_age => nil

  self.class.i_am_the_instance self
end

Instance Attribute Details

#indexObject (readonly) Also known as: ferret

these two accessors should ONLY be used by single-threaded programs. otherwise you will have a naughty ferret on your hands.



30
31
32
# File 'lib/sup/index.rb', line 30

def index
  @index
end

Instance Method Details

#add_source(source) ⇒ Object



132
133
134
135
136
137
138
139
140
141
# File 'lib/sup/index.rb', line 132

def add_source source
  @source_mutex.synchronize do
    raise "duplicate source!" if @sources.include? source
    @sources_dirty = true
    max = @sources.max_of { |id, s| s.is_a?(DraftLoader) || s.is_a?(SentLoader) ? 0 : id }
    source.id ||= (max || 0) + 1
    ##source.id += 1 while @sources.member? source.id
    @sources[source.id] = source
  end
end

#build_message(docid) ⇒ Object

builds a message object from a ferret result



399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
# File 'lib/sup/index.rb', line 399

def build_message docid
  @index_mutex.synchronize do
    doc = @index[docid]

    source = @source_mutex.synchronize { @sources[doc[:source_id].to_i] }
    raise "invalid source #{doc[:source_id]}" unless source

    #puts "building message #{doc[:message_id]} (#{source}##{doc[:source_info]})"

    fake_header = {
      "date" => Time.at(doc[:date].to_i),
      "subject" => unwrap_subj(doc[:subject]),
      "from" => doc[:from],
      "to" => doc[:to].split(/\s+/).join(", "), # reformat
      "message-id" => doc[:message_id],
      "references" => doc[:refs].split(/\s+/).map { |x| "<#{x}>" }.join(" "),
    }

    Message.new :source => source, :source_info => doc[:source_info].to_i,
                :labels => doc[:label].split(" ").map { |s| s.intern },
                :snippet => doc[:snippet], :header => fake_header
  end
end

#contains?(m) ⇒ Boolean



282
# File 'lib/sup/index.rb', line 282

def contains? m; contains_id? m.id end

#contains_id?(id) ⇒ Boolean



279
280
281
# File 'lib/sup/index.rb', line 279

def contains_id? id
  @index_mutex.synchronize { @index.search(Ferret::Search::TermQuery.new(:message_id, id)).total_hits > 0 }
end

#drop_entry(docno) ⇒ Object



427
# File 'lib/sup/index.rb', line 427

def drop_entry docno; @index_mutex.synchronize { @index.delete docno } end

#each_id_by_date(opts = {}) ⇒ Object



289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
# File 'lib/sup/index.rb', line 289

def each_id_by_date opts={}
  return if empty? # otherwise ferret barfs ###TODO: remove this once my ferret patch is accepted
  query = build_query opts
  offset = 0
  while true
    limit = (opts[:limit])? [EACH_BY_DATE_NUM, opts[:limit] - offset].min : EACH_BY_DATE_NUM
    results = @index_mutex.synchronize { @index.search query, :sort => "date DESC", :limit => limit, :offset => offset }
    Redwood::log "got #{results.total_hits} results for query (offset #{offset}) #{query.inspect}"
    results.hits.each do |hit|
      yield @index_mutex.synchronize { @index[hit.doc][:message_id] }, lambda { build_message hit.doc }
    end
    break if opts[:limit] and offset >= opts[:limit] - limit
    break if offset >= results.total_hits - limit
    offset += limit
  end
end

#each_message_in_thread_for(m, opts = {}) ⇒ Object



323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
# File 'lib/sup/index.rb', line 323

def each_message_in_thread_for m, opts={}
  #Redwood::log "Building thread for #{m.id}: #{m.subj}"
  messages = {}
  searched = {}
  num_queries = 0

  pending = [m.id]
  if $config[:thread_by_subject] # do subject queries
    date_min = m.date - (SAME_SUBJECT_DATE_LIMIT * 12 * 3600)
    date_max = m.date + (SAME_SUBJECT_DATE_LIMIT * 12 * 3600)

    q = Ferret::Search::BooleanQuery.new true
    sq = Ferret::Search::PhraseQuery.new(:subject)
    wrap_subj(Message.normalize_subj(m.subj)).split(/\s+/).each do |t|
      sq.add_term t
    end
    q.add_query sq, :must
    q.add_query Ferret::Search::RangeQuery.new(:date, :>= => date_min.to_indexable_s, :<= => date_max.to_indexable_s), :must

    q = build_query :qobj => q

    p1 = @index_mutex.synchronize { @index.search(q).hits.map { |hit| @index[hit.doc][:message_id] } }
    Redwood::log "found #{p1.size} results for subject query #{q}"

    p2 = @index_mutex.synchronize { @index.search(q.to_s, :limit => :all).hits.map { |hit| @index[hit.doc][:message_id] } }
    Redwood::log "found #{p2.size} results in string form"

    pending = (pending + p1 + p2).uniq
  end

  until pending.empty? || (opts[:limit] && messages.size >= opts[:limit])
    q = Ferret::Search::BooleanQuery.new true
    # this disappeared in newer ferrets... wtf.
    # q.max_clause_count = 2048

    lim = [MAX_CLAUSES / 2, pending.length].min
    pending[0 ... lim].each do |id|
      searched[id] = true
      q.add_query Ferret::Search::TermQuery.new(:message_id, id), :should
      q.add_query Ferret::Search::TermQuery.new(:refs, id), :should
    end
    pending = pending[lim .. -1]

    q = build_query :qobj => q

    num_queries += 1
    killed = false
    @index_mutex.synchronize do
      @index.search_each(q, :limit => :all) do |docid, score|
        break if opts[:limit] && messages.size >= opts[:limit]
        if @index[docid][:label].split(/\s+/).include?("killed") && opts[:skip_killed]
          killed = true
          break
        end
        mid = @index[docid][:message_id]
        unless messages.member?(mid)
          #Redwood::log "got #{mid} as a child of #{id}"
          messages[mid] ||= lambda { build_message docid }
          refs = @index[docid][:refs].split(" ")
          pending += refs.select { |id| !searched[id] }
        end
      end
    end
  end

  if killed
    Redwood::log "thread for #{m.id} is killed, ignoring"
    false
  else
    Redwood::log "ran #{num_queries} queries to build thread of #{messages.size + 1} messages for #{m.id}: #{m.subj}" if num_queries > 0
    messages.each { |mid, builder| yield mid, builder }
    true
  end
end

#empty?Boolean



284
# File 'lib/sup/index.rb', line 284

def empty?; size == 0 end

#fancy_lock_error_message_for(e) ⇒ Object



82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
# File 'lib/sup/index.rb', line 82

def fancy_lock_error_message_for e
  secs = (Time.now - e.mtime).to_i
  mins = secs / 60
  time =
    if mins == 0
      possibly_pluralize secs , "second"
    else
      possibly_pluralize mins, "minute"
    end

  <<EOS
Error: the sup index is locked by another process! User '#{e.user}' on
host '#{e.host}' is running #{e.pname} with pid #{e.pid}. The process was alive
as of #{time} ago.
EOS
end

#fresh_thread_idObject



423
# File 'lib/sup/index.rb', line 423

def fresh_thread_id; @next_thread_id += 1; end

#has_any_from_source_with_label?(source, label) ⇒ Boolean



479
480
481
482
483
484
# File 'lib/sup/index.rb', line 479

def has_any_from_source_with_label? source, label
  q = Ferret::Search::BooleanQuery.new
  q.add_query Ferret::Search::TermQuery.new("source_id", source.id.to_s), :must
  q.add_query Ferret::Search::TermQuery.new("label", label.to_s), :must
  @index_mutex.synchronize { @index.search(q, :limit => 1).total_hits > 0 }
end

#loadObject



120
121
122
123
# File 'lib/sup/index.rb', line 120

def load
  load_sources
  load_index
end

#load_contacts(emails, h = {}) ⇒ Object



440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
# File 'lib/sup/index.rb', line 440

def load_contacts emails, h={}
  q = Ferret::Search::BooleanQuery.new true
  emails.each do |e|
    qq = Ferret::Search::BooleanQuery.new true
    qq.add_query Ferret::Search::TermQuery.new(:from, e), :should
    qq.add_query Ferret::Search::TermQuery.new(:to, e), :should
    q.add_query qq
  end
  q.add_query Ferret::Search::TermQuery.new(:label, "spam"), :must_not
  
  Redwood::log "contact search: #{q}"
  contacts = {}
  num = h[:num] || 20
  @index_mutex.synchronize do
    @index.search_each q, :sort => "date DESC", :limit => :all do |docid, score|
      break if contacts.size >= num
      #Redwood::log "got message #{docid} to: #{@index[docid][:to].inspect} and from: #{@index[docid][:from].inspect}"
      f = @index[docid][:from]
      t = @index[docid][:to]

      if AccountManager. f
        t.split(" ").each { |e| contacts[Person.from_address(e)] = true }
      else
        contacts[Person.from_address(f)] = true
      end
    end
  end

  contacts.keys.compact
end

#load_entry_for_id(mid) ⇒ Object



429
430
431
432
433
434
435
436
437
438
# File 'lib/sup/index.rb', line 429

def load_entry_for_id mid
  @index_mutex.synchronize do
    results = @index.search Ferret::Search::TermQuery.new(:message_id, mid)
    return if results.total_hits == 0
    docid = results.hits[0].doc
    entry = @index[docid]
    entry_dup = entry.fields.inject({}) { |h, f| h[f] = entry[f]; h }
    [docid, entry_dup]
  end
end

#load_index(dir = File.join(@dir, "ferret")) ⇒ Object



151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
# File 'lib/sup/index.rb', line 151

def load_index dir=File.join(@dir, "ferret")
  if File.exists? dir
    Redwood::log "loading index..."
    @index_mutex.synchronize do
      @index = Ferret::Index::Index.new(:path => dir, :analyzer => @analyzer)
      Redwood::log "loaded index of #{@index.size} messages"
    end
  else
    Redwood::log "creating index..."
    @index_mutex.synchronize do
      field_infos = Ferret::Index::FieldInfos.new :store => :yes
      field_infos.add_field :message_id, :index => :untokenized
      field_infos.add_field :source_id
      field_infos.add_field :source_info
      field_infos.add_field :date, :index => :untokenized
      field_infos.add_field :body
      field_infos.add_field :label
      field_infos.add_field :attachments
      field_infos.add_field :subject
      field_infos.add_field :from
      field_infos.add_field :to
      field_infos.add_field :refs
      field_infos.add_field :snippet, :index => :no, :term_vector => :no
      field_infos.create_index dir
      @index = Ferret::Index::Index.new(:path => dir, :analyzer => @analyzer)
    end
  end
end

#load_sources(fn = Redwood::SOURCE_FN) ⇒ Object



471
472
473
474
475
476
477
# File 'lib/sup/index.rb', line 471

def load_sources fn=Redwood::SOURCE_FN
  source_array = (Redwood::load_yaml_obj(fn) || []).map { |o| Recoverable.new o }
  @source_mutex.synchronize do
    @sources = Hash[*(source_array).map { |s| [s.id, s] }.flatten]
    @sources_dirty = false
  end
end

#lockObject



54
55
56
57
58
59
60
61
# File 'lib/sup/index.rb', line 54

def lock
  Redwood::log "locking #{lockfile}..."
  begin
    @lock.lock
  rescue Lockfile::MaxTriesLockError
    raise LockError, @lock.lockinfo_on_disk
  end
end

#lock_or_dieObject



99
100
101
102
103
104
105
106
107
108
109
110
111
# File 'lib/sup/index.rb', line 99

def lock_or_die
  begin
    lock
  rescue LockError => e
    $stderr.puts fancy_lock_error_message_for(e)
    $stderr.puts <<EOS

You can wait for the process to finish, or, if it crashed and left a
stale lock file behind, you can manually delete #{@lock.path}.
EOS
    exit
  end
end

#lockfileObject



52
# File 'lib/sup/index.rb', line 52

def lockfile; File.join @dir, "lock" end

#num_results_for(opts = {}) ⇒ Object



306
307
308
309
310
311
# File 'lib/sup/index.rb', line 306

def num_results_for opts={}
  return 0 if empty? # otherwise ferret barfs ###TODO: remove this once my ferret patch is accepted

  q = build_query opts
  @index_mutex.synchronize { @index.search(q, :limit => 1).total_hits }
end

#possibly_pluralize(number_of, kind) ⇒ Object



77
78
79
80
# File 'lib/sup/index.rb', line 77

def possibly_pluralize number_of, kind
  "#{number_of} #{kind}" +
      if number_of == 1 then "" else "s" end
end

#saveObject



125
126
127
128
129
130
# File 'lib/sup/index.rb', line 125

def save
  Redwood::log "saving index and sources..."
  FileUtils.mkdir_p @dir unless File.exists? @dir
  save_sources
  save_index
end

#save_index(fn = File.join(@dir, "ferret")) ⇒ Object



275
276
277
# File 'lib/sup/index.rb', line 275

def save_index fn=File.join(@dir, "ferret")
  # don't have to do anything, apparently
end

#sizeObject



283
# File 'lib/sup/index.rb', line 283

def size; @index_mutex.synchronize { @index.size } end

#source_for(uri) ⇒ Object



148
# File 'lib/sup/index.rb', line 148

def source_for uri; sources.find { |s| s.is_source_for? uri }; end

#sourcesObject



143
144
145
146
# File 'lib/sup/index.rb', line 143

def sources
  ## favour the inbox by listing non-archived sources first
  @source_mutex.synchronize { @sources.values }.sort_by { |s| s.id }.partition { |s| !s.archived? }.flatten
end

#start_lock_update_threadObject



63
64
65
66
67
68
69
70
# File 'lib/sup/index.rb', line 63

def start_lock_update_thread
  @lock_update_thread = Redwood::reporting_thread("lock update") do
    while true
      sleep 30
      @lock.touch_yourself
    end
  end
end

#stop_lock_update_threadObject



72
73
74
75
# File 'lib/sup/index.rb', line 72

def stop_lock_update_thread
  @lock_update_thread.kill if @lock_update_thread
  @lock_update_thread = nil
end

#sync_message(m, docid = nil, entry = nil, opts = {}) ⇒ Object

Syncs the message to the index: deleting if it's already there, and adding either way. Index state will be determined by m.labels.

docid and entry can be specified if they're already known.



184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
# File 'lib/sup/index.rb', line 184

def sync_message m, docid=nil, entry=nil, opts={}
  docid, entry = load_entry_for_id m.id unless docid && entry

  raise "no source info for message #{m.id}" unless m.source && m.source_info
  @index_mutex.synchronize do
    raise "trying to delete non-corresponding entry #{docid} with index message-id #{@index[docid][:message_id].inspect} and parameter message id #{m.id.inspect}" if docid && @index[docid][:message_id] != m.id
  end

  source_id = 
    if m.source.is_a? Integer
      m.source
    else
      m.source.id or raise "unregistered source #{m.source} (id #{m.source.id.inspect})"
    end

  snippet = 
    if m.snippet_contains_encrypted_content? && $config[:discard_snippets_from_encrypted_messages]
      ""
    else
      m.snippet
    end

  ## write the new document to the index. if the entry already exists in the
  ## index, reuse it (which avoids having to reload the entry from the source,
  ## which can be quite expensive for e.g. large threads of IMAP actions.)
  ##
  ## exception: if the index entry belongs to an earlier version of the
  ## message, use everything from the new message instead, but union the
  ## flags. this allows messages sent to mailing lists to have their header
  ## updated and to have flags set properly.
  ##
  ## minor hack: messages in sources with lower ids have priority over
  ## messages in sources with higher ids. so messages in the inbox will
  ## override everyone, and messages in the sent box will be overridden
  ## by everyone else.
  ##
  ## written in this manner to support previous versions of the index which
  ## did not keep around the entry body. upgrading is thus seamless.
  entry ||= {}
  labels = m.labels.uniq # override because this is the new state, unless...

  ## if we are a later version of a message, ignore what's in the index,
  ## but merge in the labels.
  if entry[:source_id] && entry[:source_info] && entry[:label] &&
    ((entry[:source_id].to_i > source_id) || (entry[:source_info].to_i < m.source_info))
    labels = (entry[:label].split(/\s+/).map { |l| l.intern } + m.labels).uniq
    #Redwood::log "found updated version of message #{m.id}: #{m.subj}"
    #Redwood::log "previous version was at #{entry[:source_id].inspect}:#{entry[:source_info].inspect}, this version at #{source_id.inspect}:#{m.source_info.inspect}"
    #Redwood::log "merged labels are #{labels.inspect} (index #{entry[:label].inspect}, message #{m.labels.inspect})"
    entry = {}
  end

  ## if force_overwite is true, ignore what's in the index. this is used
  ## primarily by sup-sync to force index updates.
  entry = {} if opts[:force_overwrite]

  d = {
    :message_id => m.id,
    :source_id => source_id,
    :source_info => m.source_info,
    :date => (entry[:date] || m.date.to_indexable_s),
    :body => (entry[:body] || m.indexable_content),
    :snippet => snippet, # always override
    :label => labels.uniq.join(" "),
    :attachments => (entry[:attachments] || m.attachments.uniq.join(" ")),

    ## always override :from and :to.
    ## older versions of Sup would often store the wrong thing in the index
    ## (because they were canonicalizing email addresses, resulting in the
    ## wrong name associated with each.) the correct address is read from
    ## the original header when these messages are opened in thread-view-mode,
    ## so this allows people to forcibly update the address in the index by
    ## marking those threads for saving.
    :from => (m.from ? m.from.indexable_content : ""),
    :to => (m.to + m.cc + m.bcc).map { |x| x.indexable_content }.join(" "),

    :subject => (entry[:subject] || wrap_subj(Message.normalize_subj(m.subj))),
    :refs => (entry[:refs] || (m.refs + m.replytos).uniq.join(" ")),
  }

  @index_mutex.synchronize  do
    @index.delete docid if docid
    @index.add_document d
  end

  docid, entry = load_entry_for_id m.id
  ## this hasn't been triggered in a long time. TODO: decide whether it's still a problem.
  raise "just added message #{m.id.inspect} but couldn't find it in a search" unless docid
  true
end

#unlockObject



113
114
115
116
117
118
# File 'lib/sup/index.rb', line 113

def unlock
  if @lock && @lock.locked?
    Redwood::log "unlocking #{lockfile}..."
    @lock.unlock
  end
end

#unwrap_subj(subj) ⇒ Object



425
# File 'lib/sup/index.rb', line 425

def unwrap_subj subj; subj =~ /__START_SUBJECT__ (.*?) __END_SUBJECT__/ && $1; end

#usual_sourcesObject



149
# File 'lib/sup/index.rb', line 149

def usual_sources; sources.find_all { |s| s.usual? }; end

#wrap_subj(subj) ⇒ Object



424
# File 'lib/sup/index.rb', line 424

def wrap_subj subj; "__START_SUBJECT__ #{subj} __END_SUBJECT__"; end