Class: Redwood::Index

Inherits:
Object show all
Includes:
InteractiveLock, Singleton
Defined in:
lib/sup/index.rb

Overview

This index implementation uses Xapian for searching and storage. It tends to be slightly faster than Ferret for indexing and significantly faster for searching due to precomputing thread membership.

Defined Under Namespace

Classes: LockError, ParseError

Constant Summary collapse

STEM_LANGUAGE =
"english"
INDEX_VERSION =
'4'
MIN_DATE =

dates are converted to integers for xapian, and are used for document ids, so we must ensure they’re reasonably valid. this typically only affect spam.

Time.at 0
MAX_DATE =
Time.at(2**31-1)
EACH_ID_PAGE =

Yield each message-id matching query

100

Constants included from InteractiveLock

Redwood::InteractiveLock::DELAY

Instance Method Summary collapse

Methods included from Singleton

included

Methods included from InteractiveLock

#lock_interactively, #pluralize, #time_ago_in_words

Constructor Details

#initialize(dir = BASE_DIR) ⇒ Index

Returns a new instance of Index.



50
51
52
53
54
55
56
57
# File 'lib/sup/index.rb', line 50

def initialize dir=BASE_DIR
  @dir = dir
  FileUtils.mkdir_p @dir
  @lock = Lockfile.new lockfile, :retries => 0, :max_age => nil
  @sync_worker = nil
  @sync_queue = Queue.new
  @index_mutex = Monitor.new
end

Instance Method Details

#add_message(m) ⇒ Object



125
# File 'lib/sup/index.rb', line 125

def add_message m; sync_message m, true end

#begin_transactionObject

wrap all future changes inside a transaction so they’re done atomically



260
261
262
# File 'lib/sup/index.rb', line 260

def begin_transaction
  synchronize { @xapian.begin_transaction }
end

#build_message(id) ⇒ Object

Load message with the given message-id from the index



195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
# File 'lib/sup/index.rb', line 195

def build_message id
  entry = synchronize { get_entry id }
  return unless entry

  locations = entry[:locations].map do |source_id,source_info|
    source = SourceManager[source_id]
    raise "invalid source #{source_id}" unless source
    Location.new source, source_info
  end

  m = Message.new :locations => locations,
                  :labels => entry[:labels],
                  :snippet => entry[:snippet]

  mk_person = lambda { |x| Person.new(*x.reverse!) }
  entry[:from] = mk_person[entry[:from]]
  entry[:to].map!(&mk_person)
  entry[:cc].map!(&mk_person)
  entry[:bcc].map!(&mk_person)

  m.load_from_index! entry
  m
end

#cancel_transactionObject

abort the transaction and revert all changes made since begin_transaction



270
271
272
# File 'lib/sup/index.rb', line 270

def cancel_transaction
  synchronize { @xapian.cancel_transaction }
end

#commit_transactionObject

complete the transaction and write all previous changes to disk



265
266
267
# File 'lib/sup/index.rb', line 265

def commit_transaction
  synchronize { @xapian.commit_transaction }
end

#contains?(m) ⇒ Boolean

Returns:

  • (Boolean)


138
# File 'lib/sup/index.rb', line 138

def contains? m; contains_id? m.id end

#contains_id?(id) ⇒ Boolean

Returns:

  • (Boolean)


134
135
136
# File 'lib/sup/index.rb', line 134

def contains_id? id
  synchronize { find_docid(id) && true }
end

#delete(id) ⇒ Object

Delete message with the given message-id from the index



220
221
222
# File 'lib/sup/index.rb', line 220

def delete id
  synchronize { @xapian.delete_document mkterm(:msgid, id) }
end

#each_id(query = {}) ⇒ Object



239
240
241
242
243
244
245
246
247
248
249
250
# File 'lib/sup/index.rb', line 239

def each_id query={}
  offset = 0
  page = EACH_ID_PAGE

  xapian_query = build_xapian_query query
  while true
    ids = run_query_ids xapian_query, offset, (offset+page)
    ids.each { |id| yield id }
    break if ids.size < page
    offset += page
  end
end

#each_id_by_date(query = {}) ⇒ Object

Yields a message-id and message-building lambda for each message that matches the given query, in descending date order. You should probably not call this on a block that doesn’t break rather quickly because the results can be very large.



150
151
152
# File 'lib/sup/index.rb', line 150

def each_id_by_date query={}
  each_id(query) { |id| yield id, lambda { build_message id } }
end

#each_message(query = {}, &b) ⇒ Object

Yield each message matching query



253
254
255
256
257
# File 'lib/sup/index.rb', line 253

def each_message query={}, &b
  each_id query do |id|
    yield build_message(id)
  end
end

#each_message_in_thread_for(m, opts = {}) ⇒ Object

yield all messages in the thread containing ‘m’ by repeatedly querying the index. yields pairs of message ids and message-building lambdas, so that building an unwanted message can be skipped in the block if desired.

only two options, :limit and :skip_killed. if :skip_killed is true, stops loading any thread if a message with a :killed flag is found.



169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
# File 'lib/sup/index.rb', line 169

def each_message_in_thread_for m, opts={}
  # TODO thread by subject
  return unless doc = find_doc(m.id)
  queue = doc.value(THREAD_VALUENO).split(',')
  msgids = [m.id]
  seen_threads = Set.new
  seen_messages = Set.new [m.id]
  while not queue.empty?
    thread_id = queue.pop
    next if seen_threads.member? thread_id
    return false if opts[:skip_killed] && thread_killed?(thread_id)
    seen_threads << thread_id
    docs = term_docids(mkterm(:thread, thread_id)).map { |x| @xapian.document x }
    docs.each do |doc|
      msgid = doc.value MSGID_VALUENO
      next if seen_messages.member? msgid
      msgids << msgid
      seen_messages << msgid
      queue.concat doc.value(THREAD_VALUENO).split(',')
    end
  end
  msgids.each { |id| yield id, lambda { build_message id } }
  true
end

#each_prefixed_term(prefix) ⇒ Object

Yields each tearm in the index that starts with prefix



286
287
288
289
290
291
292
293
294
# File 'lib/sup/index.rb', line 286

def each_prefixed_term prefix
  term = @xapian._dangerous_allterms_begin prefix
  lastTerm = @xapian._dangerous_allterms_end prefix
  until term.equals lastTerm
    yield term.term
    term.next
  end
  nil
end

#each_source_info(source_id, prefix = '', &b) ⇒ Object

Yields (in lexicographical order) the source infos of all locations from the given source with the given source_info prefix



298
299
300
301
302
303
# File 'lib/sup/index.rb', line 298

def each_source_info source_id, prefix='', &b
  prefix = mkterm :location, source_id, prefix
  each_prefixed_term prefix do |x|
    yield x[prefix.length..-1]
  end
end

#empty?Boolean

Returns:

  • (Boolean)


144
# File 'lib/sup/index.rb', line 144

def empty?; size == 0 end

#loadObject



91
92
93
94
# File 'lib/sup/index.rb', line 91

def load
  SourceManager.load_sources
  load_index
end

#load_contacts(email_addresses, opts = {}) ⇒ Object

Given an array of email addresses, return an array of Person objects that have sent mail to or received mail from any of the given addresses.



226
227
228
229
230
231
232
233
234
235
# File 'lib/sup/index.rb', line 226

def load_contacts email_addresses, opts={}
  contacts = Set.new
  num = opts[:num] || 20
  each_id_by_date :participants => email_addresses do |id,b|
    break if contacts.size >= num
    m = b.call
    ([m.from]+m.to+m.cc+m.bcc).compact.each { |p| contacts << [p.name, p.email] }
  end
  contacts.to_a.compact.map { |n,e| Person.new n, e }[0...num]
end

#load_indexObject



103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
# File 'lib/sup/index.rb', line 103

def load_index
  path = File.join(@dir, 'xapian')
  if File.exists? path
    @xapian = Xapian::WritableDatabase.new(path, Xapian::DB_OPEN)
    db_version = @xapian. 'version'
    db_version = '0' if db_version.empty?
    if false
      info "Upgrading index format #{db_version} to #{INDEX_VERSION}"
      @xapian. 'version', INDEX_VERSION
    elsif db_version != INDEX_VERSION
      fail "This Sup version expects a v#{INDEX_VERSION} index, but you have an existing v#{db_version} index. Please run sup-dump to save your labels, move #{path} out of the way, and run sup-sync --restore."
    end
  else
    @xapian = Xapian::WritableDatabase.new(path, Xapian::DB_CREATE)
    @xapian. 'version', INDEX_VERSION
    @xapian. 'rescue-version', '0'
  end
  @enquire = Xapian::Enquire.new @xapian
  @enquire.weighting_scheme = Xapian::BoolWeight.new
  @enquire.docid_order = Xapian::Enquire::ASCENDING
end

#lockObject



61
62
63
64
65
66
67
68
# File 'lib/sup/index.rb', line 61

def lock
  debug "locking #{lockfile}..."
  begin
    @lock.lock
  rescue Lockfile::MaxTriesLockError
    raise LockError, @lock.lockinfo_on_disk
  end
end

#lockfileObject



59
# File 'lib/sup/index.rb', line 59

def lockfile; File.join @dir, "lock" end

#num_results_for(query = {}) ⇒ Object

Return the number of matches for query in the index



155
156
157
158
159
# File 'lib/sup/index.rb', line 155

def num_results_for query={}
  xapian_query = build_xapian_query query
  matchset = run_query xapian_query, 0, 0, 100
  matchset.matches_estimated
end

#optimizeObject

xapian-compact takes too long, so this is a no-op until we think of something better



276
277
# File 'lib/sup/index.rb', line 276

def optimize
end

#parse_query(s) ⇒ Object

parse a query string from the user. returns a query object that can be passed to any index method with a ‘query’ argument.

raises a ParseError if something went wrong.

Raises:



312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
# File 'lib/sup/index.rb', line 312

def parse_query s
  query = {}

  subs = HookManager.run("custom-search", :subs => s) || s
  begin
    subs = SearchManager.expand subs
  rescue SearchManager::ExpansionError => e
    raise ParseError, e.message
  end
  subs = subs.gsub(/\b(to|from):(\S+)\b/) do
    field, value = $1, $2
    email_field, name_field = %w(email name).map { |x| "#{field}_#{x}" }
    if(p = ContactManager.contact_for(value))
      "#{email_field}:#{p.email}"
    elsif value == "me"
      '(' + AccountManager.user_emails.map { |e| "#{email_field}:#{e}" }.join(' OR ') + ')'
    else
      "(#{email_field}:#{value} OR #{name_field}:#{value})"
    end
  end

  ## gmail style "is" operator
  subs = subs.gsub(/\b(is|has):(\S+)\b/) do
    field, label = $1, $2
    case label
    when "read"
      "-label:unread"
    when "spam"
      query[:load_spam] = true
      "label:spam"
    when "deleted"
      query[:load_deleted] = true
      "label:deleted"
    else
      "label:#{$2}"
    end
  end

  ## labels are stored lower-case in the index
  subs = subs.gsub(/\blabel:(\S+)\b/) do
    label = $1
    "label:#{label.downcase}"
  end

  ## if we see a label:deleted or a label:spam term anywhere in the query
  ## string, we set the extra load_spam or load_deleted options to true.
  ## bizarre? well, because the query allows arbitrary parenthesized boolean
  ## expressions, without fully parsing the query, we can't tell whether
  ## the user is explicitly directing us to search spam messages or not.
  ## e.g. if the string is -(-(-(-(-label:spam)))), does the user want to
  ## search spam messages or not?
  ##
  ## so, we rely on the fact that turning these extra options ON turns OFF
  ## the adding of "-label:deleted" or "-label:spam" terms at the very
  ## final stage of query processing. if the user wants to search spam
  ## messages, not adding that is the right thing; if he doesn't want to
  ## search spam messages, then not adding it won't have any effect.
  query[:load_spam] = true if subs =~ /\blabel:spam\b/
  query[:load_deleted] = true if subs =~ /\blabel:deleted\b/
  query[:load_killed] = true if subs =~ /\blabel:killed\b/

  ## gmail style attachments "filename" and "filetype" searches
  subs = subs.gsub(/\b(filename|filetype):(\((.+?)\)\B|(\S+)\b)/) do
    field, name = $1, ($3 || $4)
    case field
    when "filename"
      debug "filename: translated #{field}:#{name} to attachment:\"#{name.downcase}\""
      "attachment:\"#{name.downcase}\""
    when "filetype"
      debug "filetype: translated #{field}:#{name} to attachment_extension:#{name.downcase}"
      "attachment_extension:#{name.downcase}"
    end
  end

  if $have_chronic
    lastdate = 2<<32 - 1
    firstdate = 0
    subs = subs.gsub(/\b(before|on|in|during|after):(\((.+?)\)\B|(\S+)\b)/) do
      field, datestr = $1, ($3 || $4)
      realdate = Chronic.parse datestr, :guess => false, :context => :past
      if realdate
        case field
        when "after"
          debug "chronic: translated #{field}:#{datestr} to #{realdate.end}"
          "date:#{realdate.end.to_i}..#{lastdate}"
        when "before"
          debug "chronic: translated #{field}:#{datestr} to #{realdate.begin}"
          "date:#{firstdate}..#{realdate.end.to_i}"
        else
          debug "chronic: translated #{field}:#{datestr} to #{realdate}"
          "date:#{realdate.begin.to_i}..#{realdate.end.to_i}"
        end
      else
        raise ParseError, "can't understand date #{datestr.inspect}"
      end
    end
  end

  ## limit:42 restrict the search to 42 results
  subs = subs.gsub(/\blimit:(\S+)\b/) do
    lim = $1
    if lim =~ /^\d+$/
      query[:limit] = lim.to_i
      ''
    else
      raise ParseError, "non-numeric limit #{lim.inspect}"
    end
  end

  debug "translated query: #{subs.inspect}"

  qp = Xapian::QueryParser.new
  qp.database = @xapian
  qp.stemmer = Xapian::Stem.new(STEM_LANGUAGE)
  qp.stemming_strategy = Xapian::QueryParser::STEM_SOME
  qp.default_op = Xapian::Query::OP_AND
  qp.add_valuerangeprocessor(Xapian::NumberValueRangeProcessor.new(DATE_VALUENO, 'date:', true))
  NORMAL_PREFIX.each { |k,vs| vs.each { |v| qp.add_prefix k, v } }
  BOOLEAN_PREFIX.each { |k,vs| vs.each { |v| qp.add_boolean_prefix k, v } }

  begin
    xapian_query = qp.parse_query(subs, Xapian::QueryParser::FLAG_PHRASE|Xapian::QueryParser::FLAG_BOOLEAN|Xapian::QueryParser::FLAG_LOVEHATE|Xapian::QueryParser::FLAG_WILDCARD)
  rescue RuntimeError => e
    raise ParseError, "xapian query parser error: #{e}"
  end

  debug "parsed xapian query: #{xapian_query.description}"

  raise ParseError if xapian_query.nil? or xapian_query.empty?
  query[:qobj] = xapian_query
  query[:text] = s
  query
end

#run_sync_workerObject



468
469
470
471
472
473
474
475
# File 'lib/sup/index.rb', line 468

def run_sync_worker
  while m = @sync_queue.deq
    return if m == :die
    update_message_state m
    # Necessary to keep Xapian calls from lagging the UI too much.
    sleep 0.03
  end
end

#saveObject



96
97
98
99
100
101
# File 'lib/sup/index.rb', line 96

def save
  debug "saving index and sources..."
  FileUtils.mkdir_p @dir unless File.exists? @dir
  SourceManager.save_sources
  save_index
end

#save_indexObject



129
130
131
132
# File 'lib/sup/index.rb', line 129

def save_index
  info "Flushing Xapian updates to disk. This may take a while..."
  @xapian.flush
end

#save_thread(t) ⇒ Object



446
447
448
449
450
451
452
453
454
455
# File 'lib/sup/index.rb', line 446

def save_thread t
  t.each_dirty_message do |m|
    if @sync_worker
      @sync_queue << m
    else
      update_message_state m
    end
    m.clear_dirty
  end
end

#sizeObject



140
141
142
# File 'lib/sup/index.rb', line 140

def size
  synchronize { @xapian.doccount }
end

#source_for_id(id) ⇒ Object

Return the id source of the source the message with the given message-id was synced from



281
282
283
# File 'lib/sup/index.rb', line 281

def source_for_id id
  synchronize { get_entry(id)[:source_id] }
end

#start_lock_update_threadObject



70
71
72
73
74
75
76
77
# File 'lib/sup/index.rb', line 70

def start_lock_update_thread
  @lock_update_thread = Redwood::reporting_thread("lock update") do
    while true
      sleep 30
      @lock.touch_yourself
    end
  end
end

#start_sync_workerObject



457
458
459
# File 'lib/sup/index.rb', line 457

def start_sync_worker
  @sync_worker = Redwood::reporting_thread('index sync') { run_sync_worker }
end

#stop_lock_update_threadObject



79
80
81
82
# File 'lib/sup/index.rb', line 79

def stop_lock_update_thread
  @lock_update_thread.kill if @lock_update_thread
  @lock_update_thread = nil
end

#stop_sync_workerObject



461
462
463
464
465
466
# File 'lib/sup/index.rb', line 461

def stop_sync_worker
  return unless worker = @sync_worker
  @sync_worker = nil
  @sync_queue << :die
  worker.join
end

#unlockObject



84
85
86
87
88
89
# File 'lib/sup/index.rb', line 84

def unlock
  if @lock && @lock.locked?
    debug "unlocking #{lockfile}..."
    @lock.unlock
  end
end

#update_message(m) ⇒ Object



126
# File 'lib/sup/index.rb', line 126

def update_message m; sync_message m, true end

#update_message_state(m) ⇒ Object



127
# File 'lib/sup/index.rb', line 127

def update_message_state m; sync_message m, false end