Class: Redwood::Index

Inherits:
Object show all
Includes:
InteractiveLock, Singleton
Defined in:
lib/sup/index.rb

Overview

This index implementation uses Xapian for searching and storage. It tends to be slightly faster than Ferret for indexing and significantly faster for searching due to precomputing thread membership.

Defined Under Namespace

Classes: LockError, ParseError

Constant Summary collapse

INDEX_VERSION =
'4'
MIN_DATE =

dates are converted to integers for xapian, and are used for document ids, so we must ensure they’re reasonably valid. this typically only affect spam.

Time.at 0
MAX_DATE =
Time.at(2**31-1)
EACH_ID_PAGE =

Yield each message-id matching query

100
NORMAL_PREFIX =

Stemmed

{
  'subject' => {:prefix => 'S', :exclusive => false},
  'body' => {:prefix => 'B', :exclusive => false},
  'from_name' => {:prefix => 'FN', :exclusive => false},
  'to_name' => {:prefix => 'TN', :exclusive => false},
  'name' => {:prefix => %w(FN TN), :exclusive => false},
  'attachment' => {:prefix => 'A', :exclusive => false},
  'email_text' => {:prefix => 'E', :exclusive => false},
  '' => {:prefix => %w(S B FN TN A E), :exclusive => false},
}
BOOLEAN_PREFIX =

Unstemmed

{
  'type' => {:prefix => 'K', :exclusive => true},
  'from_email' => {:prefix => 'FE', :exclusive => false},
  'to_email' => {:prefix => 'TE', :exclusive => false},
  'email' => {:prefix => %w(FE TE), :exclusive => false},
  'date' => {:prefix => 'D', :exclusive => true},
  'label' => {:prefix => 'L', :exclusive => false},
  'source_id' => {:prefix => 'I', :exclusive => true},
  'attachment_extension' => {:prefix => 'O', :exclusive => false},
  'msgid' => {:prefix => 'Q', :exclusive => true},
  'id' => {:prefix => 'Q', :exclusive => true},
  'thread' => {:prefix => 'H', :exclusive => false},
  'ref' => {:prefix => 'R', :exclusive => false},
  'location' => {:prefix => 'J', :exclusive => false},
}
PREFIX =
NORMAL_PREFIX.merge BOOLEAN_PREFIX
COMPL_OPERATORS =
%w[AND OR NOT]
COMPL_PREFIXES =
(
  %w[
    from to
    is has label
    filename filetypem
    before on in during after
    limit
  ] + NORMAL_PREFIX.keys + BOOLEAN_PREFIX.keys
).map{|p|"#{p}:"} + COMPL_OPERATORS

Constants included from InteractiveLock

Redwood::InteractiveLock::DELAY

Instance Method Summary collapse

Methods included from Singleton

included

Methods included from InteractiveLock

#lock_interactively, #pluralize, #time_ago_in_words

Constructor Details

#initialize(dir = BASE_DIR) ⇒ Index

Returns a new instance of Index.



60
61
62
63
64
65
66
67
# File 'lib/sup/index.rb', line 60

def initialize dir=BASE_DIR
  @dir = dir
  FileUtils.mkdir_p @dir
  @lock = Lockfile.new lockfile, :retries => 0, :max_age => nil
  @sync_worker = nil
  @sync_queue = Queue.new
  @index_mutex = Monitor.new
end

Instance Method Details

#add_message(m) ⇒ Object



135
# File 'lib/sup/index.rb', line 135

def add_message m; sync_message m, true end

#begin_transactionObject

wrap all future changes inside a transaction so they’re done atomically



307
308
309
# File 'lib/sup/index.rb', line 307

def begin_transaction
  synchronize { @xapian.begin_transaction }
end

#build_message(id) ⇒ Object

Load message with the given message-id from the index



231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
# File 'lib/sup/index.rb', line 231

def build_message id
  entry = synchronize { get_entry id }
  return unless entry

  locations = entry[:locations].map do |source_id,source_info|
    source = SourceManager[source_id]
    raise "invalid source #{source_id}" unless source
    Location.new source, source_info
  end

  m = Message.new :locations => locations,
                  :labels => entry[:labels],
                  :snippet => entry[:snippet]

  # Try to find person from contacts before falling back to
  # generating it from the address.
  mk_person = lambda { |x| Person.from_name_and_email(*x.reverse!) }
  entry[:from] = mk_person[entry[:from]]
  entry[:to].map!(&mk_person)
  entry[:cc].map!(&mk_person)
  entry[:bcc].map!(&mk_person)

  m.load_from_index! entry
  m
end

#cancel_transactionObject

abort the transaction and revert all changes made since begin_transaction



317
318
319
# File 'lib/sup/index.rb', line 317

def cancel_transaction
  synchronize { @xapian.cancel_transaction }
end

#commit_transactionObject

complete the transaction and write all previous changes to disk



312
313
314
# File 'lib/sup/index.rb', line 312

def commit_transaction
  synchronize { @xapian.commit_transaction }
end

#contains?(m) ⇒ Boolean

Returns:

  • (Boolean)


148
# File 'lib/sup/index.rb', line 148

def contains? m; contains_id? m.id end

#contains_id?(id) ⇒ Boolean

Returns:

  • (Boolean)


144
145
146
# File 'lib/sup/index.rb', line 144

def contains_id? id
  synchronize { find_docid(id) && true }
end

#delete(id) ⇒ Object

Delete message with the given message-id from the index



258
259
260
# File 'lib/sup/index.rb', line 258

def delete id
  synchronize { @xapian.delete_document mkterm(:msgid, id) }
end

#each_id(query = {}, ignore_neg_terms = true) ⇒ Object



277
278
279
280
281
282
283
284
285
286
287
288
# File 'lib/sup/index.rb', line 277

def each_id query={}, ignore_neg_terms = true
  offset = 0
  page = EACH_ID_PAGE

  xapian_query = build_xapian_query query, ignore_neg_terms
  while true
    ids = run_query_ids xapian_query, offset, (offset+page)
    ids.each { |id| yield id }
    break if ids.size < page
    offset += page
  end
end

#each_id_by_date(query = {}) ⇒ Object

Yields a message-id and message-building lambda for each message that matches the given query, in descending date order. You should probably not call this on a block that doesn’t break rather quickly because the results can be very large.



160
161
162
# File 'lib/sup/index.rb', line 160

def each_id_by_date query={}
  each_id(query) { |id| yield id, lambda { build_message id } }
end

#each_message(query = {}, ignore_neg_terms = true, &b) ⇒ Object

Yield each message matching query The ignore_neg_terms parameter is used to display result even if it contains “forbidden” labels such as :deleted, it is used in Poll#poll_from when we need to get the location of a message that may contain these labels



295
296
297
298
299
# File 'lib/sup/index.rb', line 295

def each_message query={}, ignore_neg_terms = true, &b
  each_id query, ignore_neg_terms do |id|
    yield build_message(id)
  end
end

#each_message_in_thread_for(m, opts = {}) ⇒ Object

yield all messages in the thread containing ‘m’ by repeatedly querying the index. yields pairs of message ids and message-building lambdas, so that building an unwanted message can be skipped in the block if desired.

only two options, :limit and :skip_killed. if :skip_killed is true, stops loading any thread if a message with a :killed flag is found.



205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
# File 'lib/sup/index.rb', line 205

def each_message_in_thread_for m, opts={}
  # TODO thread by subject
  return unless doc = find_doc(m.id)
  queue = doc.value(THREAD_VALUENO).split(',')
  msgids = [m.id]
  seen_threads = Set.new
  seen_messages = Set.new [m.id]
  while not queue.empty?
    thread_id = queue.pop
    next if seen_threads.member? thread_id
    return false if opts[:skip_killed] && thread_killed?(thread_id)
    seen_threads << thread_id
    docs = term_docids(mkterm(:thread, thread_id)).map { |x| @xapian.document x }
    docs.each do |doc|
      msgid = doc.value MSGID_VALUENO
      next if seen_messages.member? msgid
      msgids << msgid
      seen_messages << msgid
      queue.concat doc.value(THREAD_VALUENO).split(',')
    end
  end
  msgids.each { |id| yield id, lambda { build_message id } }
  true
end

#each_prefixed_term(prefix) ⇒ Object

Yields each tearm in the index that starts with prefix



333
334
335
336
337
338
339
340
341
# File 'lib/sup/index.rb', line 333

def each_prefixed_term prefix
  term = @xapian._dangerous_allterms_begin prefix
  lastTerm = @xapian._dangerous_allterms_end prefix
  until term.equals lastTerm
    yield term.term
    term.next
  end
  nil
end

#each_source_info(source_id, prefix = '', &b) ⇒ Object

Yields (in lexicographical order) the source infos of all locations from the given source with the given source_info prefix



345
346
347
348
349
350
# File 'lib/sup/index.rb', line 345

def each_source_info source_id, prefix='', &b
  p = mkterm :location, source_id, prefix
  each_prefixed_term p do |x|
    yield prefix + x[p.length..-1]
  end
end

#empty?Boolean

Returns:

  • (Boolean)


154
# File 'lib/sup/index.rb', line 154

def empty?; size == 0 end

#find_messages(query_expr) ⇒ Object

Search messages. Returns an Enumerator.



302
303
304
# File 'lib/sup/index.rb', line 302

def find_messages query_expr
  enum_for :each_message, parse_query(query_expr)
end

#loadObject



101
102
103
104
# File 'lib/sup/index.rb', line 101

def load
  SourceManager.load_sources
  load_index
end

#load_contacts(email_addresses, opts = {}) ⇒ Object

Given an array of email addresses, return an array of Person objects that have sent mail to or received mail from any of the given addresses.



264
265
266
267
268
269
270
271
272
273
# File 'lib/sup/index.rb', line 264

def load_contacts email_addresses, opts={}
  contacts = Set.new
  num = opts[:num] || 20
  each_id_by_date :participants => email_addresses do |id,b|
    break if contacts.size >= num
    m = b.call
    ([m.from]+m.to+m.cc+m.bcc).compact.each { |p| contacts << [p.name, p.email] }
  end
  contacts.to_a.compact[0...num].map { |n,e| Person.from_name_and_email n, e }
end

#load_indexObject



113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
# File 'lib/sup/index.rb', line 113

def load_index
  path = File.join(@dir, 'xapian')
  if File.exists? path
    @xapian = Xapian::WritableDatabase.new(path, Xapian::DB_OPEN)
    db_version = @xapian. 'version'
    db_version = '0' if db_version.empty?
    if false
      info "Upgrading index format #{db_version} to #{INDEX_VERSION}"
      @xapian. 'version', INDEX_VERSION
    elsif db_version != INDEX_VERSION
      fail "This Sup version expects a v#{INDEX_VERSION} index, but you have an existing v#{db_version} index. Please run sup-dump to save your labels, move #{path} out of the way, and run sup-sync --restore."
    end
  else
    @xapian = Xapian::WritableDatabase.new(path, Xapian::DB_CREATE)
    @xapian. 'version', INDEX_VERSION
    @xapian. 'rescue-version', '0'
  end
  @enquire = Xapian::Enquire.new @xapian
  @enquire.weighting_scheme = Xapian::BoolWeight.new
  @enquire.docid_order = Xapian::Enquire::ASCENDING
end

#lockObject



71
72
73
74
75
76
77
78
# File 'lib/sup/index.rb', line 71

def lock
  debug "locking #{lockfile}..."
  begin
    @lock.lock
  rescue Lockfile::MaxTriesLockError
    raise LockError, @lock.lockinfo_on_disk
  end
end

#lockfileObject



69
# File 'lib/sup/index.rb', line 69

def lockfile; File.join @dir, "lock" end

#message_joining_killed?(m) ⇒ Boolean

check if a message is part of a killed thread (warning: duplicates code below) NOTE: We can be more efficient if we assume every killed message that hasn’t been initially added to the indexi s this way

Returns:

  • (Boolean)


176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
# File 'lib/sup/index.rb', line 176

def message_joining_killed? m
  return false unless doc = find_doc(m.id)
  queue = doc.value(THREAD_VALUENO).split(',')
  seen_threads = Set.new
  seen_messages = Set.new [m.id]
  while not queue.empty?
    thread_id = queue.pop
    next if seen_threads.member? thread_id
    return true if thread_killed?(thread_id)
    seen_threads << thread_id
    docs = term_docids(mkterm(:thread, thread_id)).map { |x| @xapian.document x }
    docs.each do |doc|
      msgid = doc.value MSGID_VALUENO
      next if seen_messages.member? msgid
      seen_messages << msgid
      queue.concat doc.value(THREAD_VALUENO).split(',')
    end
  end
  false
end

#num_results_for(query = {}) ⇒ Object

Return the number of matches for query in the index



165
166
167
168
169
# File 'lib/sup/index.rb', line 165

def num_results_for query={}
  xapian_query = build_xapian_query query
  matchset = run_query xapian_query, 0, 0, 100
  matchset.matches_estimated
end

#optimizeObject

xapian-compact takes too long, so this is a no-op until we think of something better



323
324
# File 'lib/sup/index.rb', line 323

def optimize
end

#parse_query(s) ⇒ Object

parse a query string from the user. returns a query object that can be passed to any index method with a ‘query’ argument.

raises a ParseError if something went wrong.

Raises:



401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
# File 'lib/sup/index.rb', line 401

def parse_query s
  query = {}

  subs = HookManager.run("custom-search", :subs => s) || s
  begin
    subs = SearchManager.expand subs
  rescue SearchManager::ExpansionError => e
    raise ParseError, e.message
  end
  subs = subs.gsub(/\b(to|from):(\S+)\b/) do
    field, value = $1, $2
    email_field, name_field = %w(email name).map { |x| "#{field}_#{x}" }
    if(p = ContactManager.contact_for(value))
      "#{email_field}:#{p.email}"
    elsif value == "me"
      '(' + AccountManager.user_emails.map { |e| "#{email_field}:#{e}" }.join(' OR ') + ')'
    else
      "(#{email_field}:#{value} OR #{name_field}:#{value})"
    end
  end

  ## gmail style "is" operator
  subs = subs.gsub(/\b(is|has):(\S+)\b/) do
    field, label = $1, $2
    case label
    when "read"
      "-label:unread"
    when "spam"
      query[:load_spam] = true
      "label:spam"
    when "deleted"
      query[:load_deleted] = true
      "label:deleted"
    else
      "label:#{$2}"
    end
  end

  ## labels are stored lower-case in the index
  subs = subs.gsub(/\blabel:(\S+)\b/) do
    label = $1
    "label:#{label.downcase}"
  end

  ## if we see a label:deleted or a label:spam term anywhere in the query
  ## string, we set the extra load_spam or load_deleted options to true.
  ## bizarre? well, because the query allows arbitrary parenthesized boolean
  ## expressions, without fully parsing the query, we can't tell whether
  ## the user is explicitly directing us to search spam messages or not.
  ## e.g. if the string is -(-(-(-(-label:spam)))), does the user want to
  ## search spam messages or not?
  ##
  ## so, we rely on the fact that turning these extra options ON turns OFF
  ## the adding of "-label:deleted" or "-label:spam" terms at the very
  ## final stage of query processing. if the user wants to search spam
  ## messages, not adding that is the right thing; if he doesn't want to
  ## search spam messages, then not adding it won't have any effect.
  query[:load_spam] = true if subs =~ /\blabel:spam\b/
  query[:load_deleted] = true if subs =~ /\blabel:deleted\b/
  query[:load_killed] = true if subs =~ /\blabel:killed\b/

  ## gmail style attachments "filename" and "filetype" searches
  subs = subs.gsub(/\b(filename|filetype):(\((.+?)\)\B|(\S+)\b)/) do
    field, name = $1, ($3 || $4)
    case field
    when "filename"
      debug "filename: translated #{field}:#{name} to attachment:\"#{name.downcase}\""
      "attachment:\"#{name.downcase}\""
    when "filetype"
      debug "filetype: translated #{field}:#{name} to attachment_extension:#{name.downcase}"
      "attachment_extension:#{name.downcase}"
    end
  end

  lastdate = 2<<32 - 1
  firstdate = 0
  subs = subs.gsub(/\b(before|on|in|during|after):(\((.+?)\)\B|(\S+)\b)/) do
    field, datestr = $1, ($3 || $4)
    realdate = Chronic.parse datestr, :guess => false, :context => :past
    if realdate
      case field
      when "after"
        debug "chronic: translated #{field}:#{datestr} to #{realdate.end}"
        "date:#{realdate.end.to_i}..#{lastdate}"
      when "before"
        debug "chronic: translated #{field}:#{datestr} to #{realdate.begin}"
        "date:#{firstdate}..#{realdate.end.to_i}"
      else
        debug "chronic: translated #{field}:#{datestr} to #{realdate}"
        "date:#{realdate.begin.to_i}..#{realdate.end.to_i}"
      end
    else
      raise ParseError, "can't understand date #{datestr.inspect}"
    end
  end

  ## limit:42 restrict the search to 42 results
  subs = subs.gsub(/\blimit:(\S+)\b/) do
    lim = $1
    if lim =~ /^\d+$/
      query[:limit] = lim.to_i
      ''
    else
      raise ParseError, "non-numeric limit #{lim.inspect}"
    end
  end

  debug "translated query: #{subs.inspect}"

  qp = Xapian::QueryParser.new
  qp.database = @xapian
  qp.stemmer = Xapian::Stem.new($config[:stem_language])
  qp.stemming_strategy = Xapian::QueryParser::STEM_SOME
  qp.default_op = Xapian::Query::OP_AND
  qp.add_valuerangeprocessor(Xapian::NumberValueRangeProcessor.new(DATE_VALUENO, 'date:', true))
  NORMAL_PREFIX.each { |k,info| info[:prefix].each { |v| qp.add_prefix k, v } }
  BOOLEAN_PREFIX.each { |k,info| info[:prefix].each { |v| qp.add_boolean_prefix k, v, info[:exclusive] } }

  begin
    xapian_query = qp.parse_query(subs, Xapian::QueryParser::FLAG_PHRASE|Xapian::QueryParser::FLAG_BOOLEAN|Xapian::QueryParser::FLAG_LOVEHATE|Xapian::QueryParser::FLAG_WILDCARD)
  rescue RuntimeError => e
    raise ParseError, "xapian query parser error: #{e}"
  end

  debug "parsed xapian query: #{Util::Query.describe(xapian_query, subs)}"

  raise ParseError if xapian_query.nil? or xapian_query.empty?
  query[:qobj] = xapian_query
  query[:text] = s
  query
end

#run_sync_workerObject



559
560
561
562
563
564
565
566
# File 'lib/sup/index.rb', line 559

def run_sync_worker
  while m = @sync_queue.deq
    return if m == :die
    update_message_state m
    # Necessary to keep Xapian calls from lagging the UI too much.
    sleep 0.03
  end
end

#saveObject



106
107
108
109
110
111
# File 'lib/sup/index.rb', line 106

def save
  debug "saving index and sources..."
  FileUtils.mkdir_p @dir unless File.exists? @dir
  SourceManager.save_sources
  save_index
end

#save_indexObject



139
140
141
142
# File 'lib/sup/index.rb', line 139

def save_index
  info "Flushing Xapian updates to disk. This may take a while..."
  @xapian.flush
end

#save_message(m) ⇒ Object



533
534
535
536
537
538
539
540
# File 'lib/sup/index.rb', line 533

def save_message m
  if @sync_worker
    @sync_queue << m
  else
    update_message_state m
  end
  m.clear_dirty
end

#save_thread(t) ⇒ Object



542
543
544
545
546
# File 'lib/sup/index.rb', line 542

def save_thread t
  t.each_dirty_message do |m|
    save_message m
  end
end

#sizeObject



150
151
152
# File 'lib/sup/index.rb', line 150

def size
  synchronize { @xapian.doccount }
end

#source_for_id(id) ⇒ Object

Return the id source of the source the message with the given message-id was synced from



328
329
330
# File 'lib/sup/index.rb', line 328

def source_for_id id
  synchronize { get_entry(id)[:source_id] }
end

#start_lock_update_threadObject



80
81
82
83
84
85
86
87
# File 'lib/sup/index.rb', line 80

def start_lock_update_thread
  @lock_update_thread = Redwood::reporting_thread("lock update") do
    while true
      sleep 30
      @lock.touch_yourself
    end
  end
end

#start_sync_workerObject



548
549
550
# File 'lib/sup/index.rb', line 548

def start_sync_worker
  @sync_worker = Redwood::reporting_thread('index sync') { run_sync_worker }
end

#stop_lock_update_threadObject



89
90
91
92
# File 'lib/sup/index.rb', line 89

def stop_lock_update_thread
  @lock_update_thread.kill if @lock_update_thread
  @lock_update_thread = nil
end

#stop_sync_workerObject



552
553
554
555
556
557
# File 'lib/sup/index.rb', line 552

def stop_sync_worker
  return unless worker = @sync_worker
  @sync_worker = nil
  @sync_queue << :die
  worker.join
end

#unlockObject



94
95
96
97
98
99
# File 'lib/sup/index.rb', line 94

def unlock
  if @lock && @lock.locked?
    debug "unlocking #{lockfile}..."
    @lock.unlock
  end
end

#update_message(m) ⇒ Object



136
# File 'lib/sup/index.rb', line 136

def update_message m; sync_message m, true end

#update_message_state(m) ⇒ Object



137
# File 'lib/sup/index.rb', line 137

def update_message_state m; sync_message m, false end