Class: Redwood::Index

Inherits:
Object show all
Includes:
InteractiveLock, Singleton
Defined in:
lib/sup/index.rb

Overview

This index implementation uses Xapian for searching and storage. It tends to be slightly faster than Ferret for indexing and significantly faster for searching due to precomputing thread membership.

Defined Under Namespace

Classes: LockError, ParseError

Constant Summary collapse

INDEX_VERSION =
'4'
MIN_DATE =

dates are converted to integers for xapian, and are used for document ids, so we must ensure they’re reasonably valid. this typically only affect spam.

Time.at 0
MAX_DATE =
Time.at(2**31-1)
EACH_ID_PAGE =

Yield each message-id matching query

100
NORMAL_PREFIX =

Stemmed

{
  'subject' => {:prefix => 'S', :exclusive => false},
  'body' => {:prefix => 'B', :exclusive => false},
  'from_name' => {:prefix => 'FN', :exclusive => false},
  'to_name' => {:prefix => 'TN', :exclusive => false},
  'name' => {:prefix => %w(FN TN), :exclusive => false},
  'attachment' => {:prefix => 'A', :exclusive => false},
  'email_text' => {:prefix => 'E', :exclusive => false},
  '' => {:prefix => %w(S B FN TN A E), :exclusive => false},
}
BOOLEAN_PREFIX =

Unstemmed

{
  'type' => {:prefix => 'K', :exclusive => true},
  'from_email' => {:prefix => 'FE', :exclusive => false},
  'to_email' => {:prefix => 'TE', :exclusive => false},
  'email' => {:prefix => %w(FE TE), :exclusive => false},
  'date' => {:prefix => 'D', :exclusive => true},
  'label' => {:prefix => 'L', :exclusive => false},
  'source_id' => {:prefix => 'I', :exclusive => true},
  'attachment_extension' => {:prefix => 'O', :exclusive => false},
  'msgid' => {:prefix => 'Q', :exclusive => true},
  'id' => {:prefix => 'Q', :exclusive => true},
  'thread' => {:prefix => 'H', :exclusive => false},
  'ref' => {:prefix => 'R', :exclusive => false},
  'location' => {:prefix => 'J', :exclusive => false},
}
PREFIX =
NORMAL_PREFIX.merge BOOLEAN_PREFIX
COMPL_OPERATORS =
%w[AND OR NOT]
COMPL_PREFIXES =
(
  %w[
    from to
    is has label
    filename filetypem
    before on in during after
    limit
  ] + NORMAL_PREFIX.keys + BOOLEAN_PREFIX.keys
).map{|p|"#{p}:"} + COMPL_OPERATORS

Constants included from InteractiveLock

Redwood::InteractiveLock::DELAY

Instance Method Summary collapse

Methods included from Singleton

included

Methods included from InteractiveLock

#lock_interactively, #pluralize, #time_ago_in_words

Constructor Details

#initialize(dir = BASE_DIR) ⇒ Index

Returns a new instance of Index.



60
61
62
63
64
65
66
67
# File 'lib/sup/index.rb', line 60

def initialize dir=BASE_DIR
  @dir = dir
  FileUtils.mkdir_p @dir
  @lock = Lockfile.new lockfile, :retries => 0, :max_age => nil
  @sync_worker = nil
  @sync_queue = Queue.new
  @index_mutex = Monitor.new
end

Instance Method Details

#add_message(m) ⇒ Object



139
# File 'lib/sup/index.rb', line 139

def add_message m; sync_message m, true end

#begin_transactionObject

wrap all future changes inside a transaction so they’re done atomically



311
312
313
# File 'lib/sup/index.rb', line 311

def begin_transaction
  synchronize { @xapian.begin_transaction }
end

#build_message(id) ⇒ Object

Load message with the given message-id from the index



235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
# File 'lib/sup/index.rb', line 235

def build_message id
  entry = synchronize { get_entry id }
  return unless entry

  locations = entry[:locations].map do |source_id,source_info|
    source = SourceManager[source_id]
    raise "invalid source #{source_id}" unless source
    Location.new source, source_info
  end

  m = Message.new :locations => locations,
                  :labels => entry[:labels],
                  :snippet => entry[:snippet]

  # Try to find person from contacts before falling back to
  # generating it from the address.
  mk_person = lambda { |x| Person.from_name_and_email(*x.reverse!) }
  entry[:from] = mk_person[entry[:from]]
  entry[:to].map!(&mk_person)
  entry[:cc].map!(&mk_person)
  entry[:bcc].map!(&mk_person)

  m.load_from_index! entry
  m
end

#cancel_transactionObject

abort the transaction and revert all changes made since begin_transaction



321
322
323
# File 'lib/sup/index.rb', line 321

def cancel_transaction
  synchronize { @xapian.cancel_transaction }
end

#commit_transactionObject

complete the transaction and write all previous changes to disk



316
317
318
# File 'lib/sup/index.rb', line 316

def commit_transaction
  synchronize { @xapian.commit_transaction }
end

#contains?(m) ⇒ Boolean

Returns:

  • (Boolean)


152
# File 'lib/sup/index.rb', line 152

def contains? m; contains_id? m.id end

#contains_id?(id) ⇒ Boolean

Returns:

  • (Boolean)


148
149
150
# File 'lib/sup/index.rb', line 148

def contains_id? id
  synchronize { find_docid(id) && true }
end

#delete(id) ⇒ Object

Delete message with the given message-id from the index



262
263
264
# File 'lib/sup/index.rb', line 262

def delete id
  synchronize { @xapian.delete_document mkterm(:msgid, id) }
end

#each_id(query = {}, ignore_neg_terms = true) ⇒ Object



281
282
283
284
285
286
287
288
289
290
291
292
# File 'lib/sup/index.rb', line 281

def each_id query={}, ignore_neg_terms = true
  offset = 0
  page = EACH_ID_PAGE

  xapian_query = build_xapian_query query, ignore_neg_terms
  while true
    ids = run_query_ids xapian_query, offset, (offset+page)
    ids.each { |id| yield id }
    break if ids.size < page
    offset += page
  end
end

#each_id_by_date(query = {}) ⇒ Object

Yields a message-id and message-building lambda for each message that matches the given query, in descending date order. You should probably not call this on a block that doesn’t break rather quickly because the results can be very large.



164
165
166
# File 'lib/sup/index.rb', line 164

def each_id_by_date query={}
  each_id(query) { |id| yield id, lambda { build_message id } }
end

#each_message(query = {}, ignore_neg_terms = true, &b) ⇒ Object

Yield each message matching query The ignore_neg_terms parameter is used to display result even if it contains “forbidden” labels such as :deleted, it is used in Poll#poll_from when we need to get the location of a message that may contain these labels



299
300
301
302
303
# File 'lib/sup/index.rb', line 299

def each_message query={}, ignore_neg_terms = true, &b
  each_id query, ignore_neg_terms do |id|
    yield build_message(id)
  end
end

#each_message_in_thread_for(m, opts = {}) ⇒ Object

yield all messages in the thread containing ‘m’ by repeatedly querying the index. yields pairs of message ids and message-building lambdas, so that building an unwanted message can be skipped in the block if desired.

only two options, :limit and :skip_killed. if :skip_killed is true, stops loading any thread if a message with a :killed flag is found.



209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
# File 'lib/sup/index.rb', line 209

def each_message_in_thread_for m, opts={}
  # TODO thread by subject
  return unless doc = find_doc(m.id)
  queue = doc.value(THREAD_VALUENO).split(',')
  msgids = [m.id]
  seen_threads = Set.new
  seen_messages = Set.new [m.id]
  while not queue.empty?
    thread_id = queue.pop
    next if seen_threads.member? thread_id
    return false if opts[:skip_killed] && thread_killed?(thread_id)
    seen_threads << thread_id
    docs = term_docids(mkterm(:thread, thread_id)).map { |x| @xapian.document x }
    docs.each do |doc|
      msgid = doc.value MSGID_VALUENO
      next if seen_messages.member? msgid
      msgids << msgid
      seen_messages << msgid
      queue.concat doc.value(THREAD_VALUENO).split(',')
    end
  end
  msgids.each { |id| yield id, lambda { build_message id } }
  true
end

#each_prefixed_term(prefix) ⇒ Object

Yields each term in the index that starts with prefix



337
338
339
340
341
342
343
344
345
# File 'lib/sup/index.rb', line 337

def each_prefixed_term prefix
  term = @xapian._dangerous_allterms_begin prefix
  lastTerm = @xapian._dangerous_allterms_end prefix
  until term.equals lastTerm
    yield term.term
    term.next
  end
  nil
end

#each_source_info(source_id, prefix = '', &b) ⇒ Object

Yields (in lexicographical order) the source infos of all locations from the given source with the given source_info prefix



349
350
351
352
353
354
# File 'lib/sup/index.rb', line 349

def each_source_info source_id, prefix='', &b
  p = mkterm :location, source_id, prefix
  each_prefixed_term p do |x|
    yield prefix + x[p.length..-1]
  end
end

#empty?Boolean

Returns:

  • (Boolean)


158
# File 'lib/sup/index.rb', line 158

def empty?; size == 0 end

#find_messages(query_expr) ⇒ Object

Search messages. Returns an Enumerator.



306
307
308
# File 'lib/sup/index.rb', line 306

def find_messages query_expr
  enum_for :each_message, parse_query(query_expr)
end

#get_xapianObject



113
114
115
# File 'lib/sup/index.rb', line 113

def get_xapian
  @xapian
end

#load(failsafe = false) ⇒ Object



101
102
103
104
# File 'lib/sup/index.rb', line 101

def load failsafe=false
  SourceManager.load_sources File.join(@dir, "sources.yaml")
  load_index failsafe
end

#load_contacts(email_addresses, opts = {}) ⇒ Object

Given an array of email addresses, return an array of Person objects that have sent mail to or received mail from any of the given addresses.



268
269
270
271
272
273
274
275
276
277
# File 'lib/sup/index.rb', line 268

def load_contacts email_addresses, opts={}
  contacts = Set.new
  num = opts[:num] || 20
  each_id_by_date :participants => email_addresses do |id,b|
    break if contacts.size >= num
    m = b.call
    ([m.from]+m.to+m.cc+m.bcc).compact.each { |p| contacts << [p.name, p.email] }
  end
  contacts.to_a.compact[0...num].map { |n,e| Person.from_name_and_email n, e }
end

#load_index(failsafe = false) ⇒ Object



117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
# File 'lib/sup/index.rb', line 117

def load_index failsafe=false
  path = File.join(@dir, 'xapian')
  if File.exist? path
    @xapian = Xapian::WritableDatabase.new(path, Xapian::DB_OPEN)
    db_version = @xapian. 'version'
    db_version = '0' if db_version.empty?
    if false
      info "Upgrading index format #{db_version} to #{INDEX_VERSION}"
      @xapian. 'version', INDEX_VERSION
    elsif db_version != INDEX_VERSION
      fail "This Sup version expects a v#{INDEX_VERSION} index, but you have an existing v#{db_version} index. Please run sup-dump to save your labels, move #{path} out of the way, and run sup-sync --restore."
    end
  else
    @xapian = Xapian::WritableDatabase.new(path, Xapian::DB_CREATE)
    @xapian. 'version', INDEX_VERSION
    @xapian. 'rescue-version', '0'
  end
  @enquire = Xapian::Enquire.new @xapian
  @enquire.weighting_scheme = Xapian::BoolWeight.new
  @enquire.docid_order = Xapian::Enquire::ASCENDING
end

#lockObject



71
72
73
74
75
76
77
78
# File 'lib/sup/index.rb', line 71

def lock
  debug "locking #{lockfile}..."
  begin
    @lock.lock
  rescue Lockfile::MaxTriesLockError
    raise LockError, @lock.lockinfo_on_disk
  end
end

#lockfileObject



69
# File 'lib/sup/index.rb', line 69

def lockfile; File.join @dir, "lock" end

#message_joining_killed?(m) ⇒ Boolean

check if a message is part of a killed thread (warning: duplicates code below) NOTE: We can be more efficient if we assume every killed message that hasn’t been initially added to the indexi s this way

Returns:

  • (Boolean)


180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
# File 'lib/sup/index.rb', line 180

def message_joining_killed? m
  return false unless doc = find_doc(m.id)
  queue = doc.value(THREAD_VALUENO).split(',')
  seen_threads = Set.new
  seen_messages = Set.new [m.id]
  while not queue.empty?
    thread_id = queue.pop
    next if seen_threads.member? thread_id
    return true if thread_killed?(thread_id)
    seen_threads << thread_id
    docs = term_docids(mkterm(:thread, thread_id)).map { |x| @xapian.document x }
    docs.each do |doc|
      msgid = doc.value MSGID_VALUENO
      next if seen_messages.member? msgid
      seen_messages << msgid
      queue.concat doc.value(THREAD_VALUENO).split(',')
    end
  end
  false
end

#num_results_for(query = {}) ⇒ Object

Return the number of matches for query in the index



169
170
171
172
173
# File 'lib/sup/index.rb', line 169

def num_results_for query={}
  xapian_query = build_xapian_query query
  matchset = run_query xapian_query, 0, 0, 100
  matchset.matches_estimated
end

#optimizeObject

xapian-compact takes too long, so this is a no-op until we think of something better



327
328
# File 'lib/sup/index.rb', line 327

def optimize
end

#parse_query(s) ⇒ Object

parse a query string from the user. returns a query object that can be passed to any index method with a ‘query’ argument.

raises a ParseError if something went wrong.



405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
# File 'lib/sup/index.rb', line 405

def parse_query s
  query = {}

  subs = HookManager.run("custom-search", :subs => s) || s
  begin
    subs = SearchManager.expand subs
  rescue SearchManager::ExpansionError => e
    raise ParseError, e.message
  end
  subs = subs.gsub(/\b(to|from):(\S+)\b/) do
    field, value = $1, $2
    email_field, name_field = %w(email name).map { |x| "#{field}_#{x}" }
    if(p = ContactManager.contact_for(value))
      "#{email_field}:#{p.email}"
    elsif value == "me"
      '(' + AccountManager.user_emails.map { |e| "#{email_field}:#{e}" }.join(' OR ') + ')'
    else
      "(#{email_field}:#{value} OR #{name_field}:#{value})"
    end
  end

  ## gmail style "is" operator
  subs = subs.gsub(/\b(is|has):(\S+)\b/) do
    _field, label = $1, $2
    case label
    when "read"
      "-label:unread"
    when "spam"
      query[:load_spam] = true
      "label:spam"
    when "deleted"
      query[:load_deleted] = true
      "label:deleted"
    else
      "label:#{$2}"
    end
  end

  ## labels are stored lower-case in the index
  subs = subs.gsub(/\blabel:(\S+)\b/) do
    label = $1
    "label:#{label.downcase}"
  end

  ## if we see a label:deleted or a label:spam term anywhere in the query
  ## string, we set the extra load_spam or load_deleted options to true.
  ## bizarre? well, because the query allows arbitrary parenthesized boolean
  ## expressions, without fully parsing the query, we can't tell whether
  ## the user is explicitly directing us to search spam messages or not.
  ## e.g. if the string is -(-(-(-(-label:spam)))), does the user want to
  ## search spam messages or not?
  ##
  ## so, we rely on the fact that turning these extra options ON turns OFF
  ## the adding of "-label:deleted" or "-label:spam" terms at the very
  ## final stage of query processing. if the user wants to search spam
  ## messages, not adding that is the right thing; if he doesn't want to
  ## search spam messages, then not adding it won't have any effect.
  query[:load_spam] = true if subs =~ /\blabel:spam\b/
  query[:load_deleted] = true if subs =~ /\blabel:deleted\b/
  query[:load_killed] = true if subs =~ /\blabel:killed\b/

  ## gmail style attachments "filename" and "filetype" searches
  subs = subs.gsub(/\b(filename|filetype):(\((.+?)\)\B|(\S+)\b)/) do
    field, name = $1, ($3 || $4)
    case field
    when "filename"
      debug "filename: translated #{field}:#{name} to attachment:\"#{name.downcase}\""
      "attachment:\"#{name.downcase}\""
    when "filetype"
      debug "filetype: translated #{field}:#{name} to attachment_extension:#{name.downcase}"
      "attachment_extension:#{name.downcase}"
    end
  end

  lastdate = 2<<32 - 1
  firstdate = 0
  subs = subs.gsub(/\b(before|on|in|during|after):(\((.+?)\)\B|(\S+)\b)/) do
    field, datestr = $1, ($3 || $4)
    realdate = Chronic.parse datestr, :guess => false, :context => :past
    if realdate
      case field
      when "after"
        debug "chronic: translated #{field}:#{datestr} to #{realdate.end}"
        "date:#{realdate.end.to_i}..#{lastdate}"
      when "before"
        debug "chronic: translated #{field}:#{datestr} to #{realdate.begin}"
        "date:#{firstdate}..#{realdate.end.to_i}"
      else
        debug "chronic: translated #{field}:#{datestr} to #{realdate}"
        "date:#{realdate.begin.to_i}..#{realdate.end.to_i}"
      end
    else
      raise ParseError, "can't understand date #{datestr.inspect}"
    end
  end

  ## limit:42 restrict the search to 42 results
  subs = subs.gsub(/\blimit:(\S+)\b/) do
    lim = $1
    if lim =~ /^\d+$/
      query[:limit] = lim.to_i
      ''
    else
      raise ParseError, "non-numeric limit #{lim.inspect}"
    end
  end

  debug "translated query: #{subs.inspect}"

  qp = Xapian::QueryParser.new
  qp.database = @xapian
  qp.stemmer = Xapian::Stem.new($config[:stem_language])
  qp.stemming_strategy = Xapian::QueryParser::STEM_SOME
  qp.default_op = Xapian::Query::OP_AND
  valuerangeprocessor = Xapian::NumberValueRangeProcessor.new(DATE_VALUENO,
                                                              'date:', true)
  qp.add_valuerangeprocessor(valuerangeprocessor)
  NORMAL_PREFIX.each { |k,info| info[:prefix].each {
    |v| qp.add_prefix k, v }
  }
  BOOLEAN_PREFIX.each { |k,info| info[:prefix].each {
    |v| qp.add_boolean_prefix k, v, info[:exclusive] }
  }

  begin
    xapian_query = qp.parse_query(subs, Xapian::QueryParser::FLAG_PHRASE   |
                                        Xapian::QueryParser::FLAG_BOOLEAN  |
                                        Xapian::QueryParser::FLAG_LOVEHATE |
                                        Xapian::QueryParser::FLAG_WILDCARD)
  rescue RuntimeError => e
    raise ParseError, "xapian query parser error: #{e}"
  end

  debug "parsed xapian query: #{Util::Query.describe(xapian_query, subs)}"

  if xapian_query.nil? or xapian_query.empty?
    raise ParseError, "couldn't parse \"#{s}\" as xapian query " \
                      "(special characters aren't indexed)"
  end

  query[:qobj] = xapian_query
  query[:text] = s
  query
end

#run_sync_workerObject



576
577
578
579
580
581
582
583
# File 'lib/sup/index.rb', line 576

def run_sync_worker
  while m = @sync_queue.deq
    return if m == :die
    update_message_state m
    # Necessary to keep Xapian calls from lagging the UI too much.
    sleep 0.03
  end
end

#saveObject



106
107
108
109
110
111
# File 'lib/sup/index.rb', line 106

def save
  debug "saving index and sources..."
  FileUtils.mkdir_p @dir unless File.exist? @dir
  SourceManager.save_sources File.join(@dir, "sources.yaml")
  save_index
end

#save_indexObject



143
144
145
146
# File 'lib/sup/index.rb', line 143

def save_index
  info "Flushing Xapian updates to disk. This may take a while..."
  @xapian.flush
end

#save_message(m, sync_back = true) ⇒ Object



550
551
552
553
554
555
556
557
# File 'lib/sup/index.rb', line 550

def save_message m, sync_back = true
  if @sync_worker
    @sync_queue << [m, sync_back]
  else
    update_message_state [m, sync_back]
  end
  m.clear_dirty
end

#save_thread(t, sync_back = true) ⇒ Object



559
560
561
562
563
# File 'lib/sup/index.rb', line 559

def save_thread t, sync_back = true
  t.each_dirty_message do |m|
    save_message m, sync_back
  end
end

#sizeObject



154
155
156
# File 'lib/sup/index.rb', line 154

def size
  synchronize { @xapian.doccount }
end

#source_for_id(id) ⇒ Object

Return the id source of the source the message with the given message-id was synced from



332
333
334
# File 'lib/sup/index.rb', line 332

def source_for_id id
  synchronize { get_entry(id)[:source_id] }
end

#start_lock_update_threadObject



80
81
82
83
84
85
86
87
# File 'lib/sup/index.rb', line 80

def start_lock_update_thread
  @lock_update_thread = Redwood::reporting_thread("lock update") do
    while true
      sleep 30
      @lock.touch_yourself
    end
  end
end

#start_sync_workerObject



565
566
567
# File 'lib/sup/index.rb', line 565

def start_sync_worker
  @sync_worker = Redwood::reporting_thread('index sync') { run_sync_worker }
end

#stop_lock_update_threadObject



89
90
91
92
# File 'lib/sup/index.rb', line 89

def stop_lock_update_thread
  @lock_update_thread.kill if @lock_update_thread
  @lock_update_thread = nil
end

#stop_sync_workerObject



569
570
571
572
573
574
# File 'lib/sup/index.rb', line 569

def stop_sync_worker
  return unless worker = @sync_worker
  @sync_worker = nil
  @sync_queue << :die
  worker.join
end

#unlockObject



94
95
96
97
98
99
# File 'lib/sup/index.rb', line 94

def unlock
  if @lock && @lock.locked?
    debug "unlocking #{lockfile}..."
    @lock.unlock
  end
end

#update_message(m) ⇒ Object



140
# File 'lib/sup/index.rb', line 140

def update_message m; sync_message m, true end

#update_message_state(m) ⇒ Object



141
# File 'lib/sup/index.rb', line 141

def update_message_state m; sync_message m[0], false, m[1] end