Class: Redwood::Index

Inherits:
Object show all
Includes:
Singleton
Defined in:
lib/sup/index.rb

Defined Under Namespace

Classes: LockError

Constant Summary collapse

EACH_BY_DATE_NUM =

you should probably not call this on a block that doesn’t break rather quickly because the results can be very large.

100
SAME_SUBJECT_DATE_LIMIT =

yield all messages in the thread containing ‘m’ by repeatedly querying the index. yields pairs of message ids and message-building lambdas, so that building an unwanted message can be skipped in the block if desired.

only two options, :limit and :skip_killed. if :skip_killed is true, stops loading any thread if a message with a :killed flag is found.

7

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(dir = BASE_DIR) ⇒ Index

Returns a new instance of Index.



28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# File 'lib/sup/index.rb', line 28

def initialize dir=BASE_DIR
  @dir = dir
  @sources = {}
  @sources_dirty = false

  wsa = Ferret::Analysis::WhiteSpaceAnalyzer.new false
  sa = Ferret::Analysis::StandardAnalyzer.new [], true
  @analyzer = Ferret::Analysis::PerFieldAnalyzer.new wsa
  @analyzer[:body] = sa
  @analyzer[:subject] = sa
  @qparser ||= Ferret::QueryParser.new :default_field => :body, :analyzer => @analyzer, :or_default => false
  @lock = Lockfile.new lockfile, :retries => 0, :max_age => nil

  self.class.i_am_the_instance self
end

Instance Attribute Details

#indexObject (readonly) Also known as: ferret

Returns the value of attribute index.



26
27
28
# File 'lib/sup/index.rb', line 26

def index
  @index
end

Instance Method Details

#add_source(source) ⇒ Object



119
120
121
122
123
124
125
126
# File 'lib/sup/index.rb', line 119

def add_source source
  raise "duplicate source!" if @sources.include? source
  @sources_dirty = true
  max = @sources.max_of { |id, s| s.is_a?(DraftLoader) || s.is_a?(SentLoader) ? 0 : id }
  source.id ||= (max || 0) + 1
  ##source.id += 1 while @sources.member? source.id
  @sources[source.id] = source
end

#build_message(docid) ⇒ Object

builds a message object from a ferret result



306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
# File 'lib/sup/index.rb', line 306

def build_message docid
  doc = @index[docid]
  source = @sources[doc[:source_id].to_i]
  #puts "building message #{doc[:message_id]} (#{source}##{doc[:source_info]})"
  raise "invalid source #{doc[:source_id]}" unless source

  fake_header = {
    "date" => Time.at(doc[:date].to_i),
    "subject" => unwrap_subj(doc[:subject]),
    "from" => doc[:from],
    "to" => doc[:to],
    "message-id" => doc[:message_id],
    "references" => doc[:refs].split(/\s+/).map { |x| "<#{x}>" }.join(" "),
  }

  Message.new :source => source, :source_info => doc[:source_info].to_i, 
              :labels => doc[:label].split(" ").map { |s| s.intern },
              :snippet => doc[:snippet], :header => fake_header
end

#contains?(m) ⇒ Boolean

Returns:

  • (Boolean)


204
# File 'lib/sup/index.rb', line 204

def contains? m; contains_id? m.id; end

#contains_id?(id) ⇒ Boolean

Returns:

  • (Boolean)


201
202
203
# File 'lib/sup/index.rb', line 201

def contains_id? id
  @index.search(Ferret::Search::TermQuery.new(:message_id, id)).total_hits > 0
end

#drop_entry(docno) ⇒ Object



330
# File 'lib/sup/index.rb', line 330

def drop_entry docno; @index.delete docno; end

#each_id_by_date(opts = {}) ⇒ Object



210
211
212
213
214
215
216
217
218
219
220
221
# File 'lib/sup/index.rb', line 210

def each_id_by_date opts={}
  return if @index.size == 0 # otherwise ferret barfs ###TODO: remove this once my ferret patch is accepted
  query = build_query opts
  offset = 0
  while true
    results = @index.search(query, :sort => "date DESC", :limit => EACH_BY_DATE_NUM, :offset => offset)
    Redwood::log "got #{results.total_hits} results for query (offset #{offset}) #{query.inspect}"
    results.hits.each { |hit| yield @index[hit.doc][:message_id], lambda { build_message hit.doc } }
    break if offset >= results.total_hits - EACH_BY_DATE_NUM
    offset += EACH_BY_DATE_NUM
  end
end

#each_message_in_thread_for(m, opts = {}) ⇒ Object



239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
# File 'lib/sup/index.rb', line 239

def each_message_in_thread_for m, opts={}
  #Redwood::log "Building thread for #{m.id}: #{m.subj}"
  messages = {}
  searched = {}
  num_queries = 0

  if $config[:thread_by_subject] # do subject queries
    date_min = m.date - (SAME_SUBJECT_DATE_LIMIT * 12 * 3600)
    date_max = m.date + (SAME_SUBJECT_DATE_LIMIT * 12 * 3600)

    q = Ferret::Search::BooleanQuery.new true
    sq = Ferret::Search::PhraseQuery.new(:subject)
    wrap_subj(Message.normalize_subj(m.subj)).split(/\s+/).each do |t|
      sq.add_term t
    end
    q.add_query sq, :must
    q.add_query Ferret::Search::RangeQuery.new(:date, :>= => date_min.to_indexable_s, :<= => date_max.to_indexable_s), :must

    q = build_query :qobj => q

    pending = @index.search(q).hits.map { |hit| @index[hit.doc][:message_id] }
    Redwood::log "found #{pending.size} results for subject query #{q}"
  else
    pending = [m.id]
  end

  until pending.empty? || (opts[:limit] && messages.size >= opts[:limit])
    q = Ferret::Search::BooleanQuery.new true

    pending.each do |id|
      searched[id] = true
      q.add_query Ferret::Search::TermQuery.new(:message_id, id), :should
      q.add_query Ferret::Search::TermQuery.new(:refs, id), :should
    end
    pending = []

    q = build_query :qobj => q

    num_queries += 1
    killed = false
    @index.search_each(q, :limit => :all) do |docid, score|
      break if opts[:limit] && messages.size >= opts[:limit]
      if @index[docid][:label].split(/\s+/).include?("killed") && opts[:skip_killed]
        killed = true
        break
      end
      mid = @index[docid][:message_id]
      unless messages.member?(mid)
        #Redwood::log "got #{mid} as a child of #{id}"
        messages[mid] ||= lambda { build_message docid }
        refs = @index[docid][:refs].split(" ")
        pending += refs.select { |id| !searched[id] }
      end
    end
  end

  if killed
    Redwood::log "thread for #{m.id} is killed, ignoring"
    false
  else
    Redwood::log "ran #{num_queries} queries to build thread of #{messages.size + 1} messages for #{m.id}: #{m.subj}" if num_queries > 0
    messages.each { |mid, builder| yield mid, builder }
    true
  end
end

#fancy_lock_error_message_for(e) ⇒ Object



69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
# File 'lib/sup/index.rb', line 69

def fancy_lock_error_message_for e
  secs = Time.now - e.mtime
  mins = secs.to_i / 60
  time =
    if mins == 0
      "#{secs.to_i} seconds"
    else
      "#{mins} minutes"
    end

  <<EOS
Error: the sup index is locked by another process! User '#{e.user}' on
host '#{e.host}' is running #{e.pname} with pid #{e.pid}. The process was alive
as of #{time} ago.
EOS
end

#fresh_thread_idObject



326
# File 'lib/sup/index.rb', line 326

def fresh_thread_id; @next_thread_id += 1; end

#has_any_from_source_with_label?(source, label) ⇒ Boolean

Returns:

  • (Boolean)


374
375
376
377
378
379
# File 'lib/sup/index.rb', line 374

def has_any_from_source_with_label? source, label
  q = Ferret::Search::BooleanQuery.new
  q.add_query Ferret::Search::TermQuery.new("source_id", source.id.to_s), :must
  q.add_query Ferret::Search::TermQuery.new("label", label.to_s), :must
  index.search(q, :limit => 1).total_hits > 0
end

#loadObject



107
108
109
110
# File 'lib/sup/index.rb', line 107

def load
  load_sources
  load_index
end

#load_contacts(emails, h = {}) ⇒ Object



339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
# File 'lib/sup/index.rb', line 339

def load_contacts emails, h={}
  q = Ferret::Search::BooleanQuery.new true
  emails.each do |e|
    qq = Ferret::Search::BooleanQuery.new true
    qq.add_query Ferret::Search::TermQuery.new(:from, e), :should
    qq.add_query Ferret::Search::TermQuery.new(:to, e), :should
    q.add_query qq
  end
  q.add_query Ferret::Search::TermQuery.new(:label, "spam"), :must_not
  
  Redwood::log "contact search: #{q}"
  contacts = {}
  num = h[:num] || 20
  @index.search_each(q, :sort => "date DESC", :limit => :all) do |docid, score|
    break if contacts.size >= num
    #Redwood::log "got message #{docid} to: #{@index[docid][:to].inspect} and from: #{@index[docid][:from].inspect}"
    f = @index[docid][:from]
    t = @index[docid][:to]

    if AccountManager. f
      t.split(" ").each { |e| contacts[PersonManager.person_for(e)] = true }
    else
      contacts[PersonManager.person_for(f)] = true
    end
  end

  contacts.keys.compact
end

#load_entry_for_id(mid) ⇒ Object



332
333
334
335
336
337
# File 'lib/sup/index.rb', line 332

def load_entry_for_id mid
  results = @index.search(Ferret::Search::TermQuery.new(:message_id, mid))
  return if results.total_hits == 0
  docid = results.hits[0].doc
  [docid, @index[docid]]
end

#load_index(dir = File.join(@dir, "ferret")) ⇒ Object



132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
# File 'lib/sup/index.rb', line 132

def load_index dir=File.join(@dir, "ferret")
  if File.exists? dir
    Redwood::log "loading index..."
    @index = Ferret::Index::Index.new(:path => dir, :analyzer => @analyzer)
    Redwood::log "loaded index of #{@index.size} messages"
  else
    Redwood::log "creating index..."
    field_infos = Ferret::Index::FieldInfos.new :store => :yes
    field_infos.add_field :message_id
    field_infos.add_field :source_id
    field_infos.add_field :source_info
    field_infos.add_field :date, :index => :untokenized
    field_infos.add_field :body, :store => :no
    field_infos.add_field :label
    field_infos.add_field :subject
    field_infos.add_field :from
    field_infos.add_field :to
    field_infos.add_field :refs
    field_infos.add_field :snippet, :index => :no, :term_vector => :no
    field_infos.create_index dir
    @index = Ferret::Index::Index.new(:path => dir, :analyzer => @analyzer)
  end
end

#load_sources(fn = Redwood::SOURCE_FN) ⇒ Object



368
369
370
371
372
# File 'lib/sup/index.rb', line 368

def load_sources fn=Redwood::SOURCE_FN
  source_array = (Redwood::load_yaml_obj(fn) || []).map { |o| Recoverable.new o }
  @sources = Hash[*(source_array).map { |s| [s.id, s] }.flatten]
  @sources_dirty = false
end

#lockObject



46
47
48
49
50
51
52
53
# File 'lib/sup/index.rb', line 46

def lock
  Redwood::log "locking #{lockfile}..."
  begin
    @lock.lock
  rescue Lockfile::MaxTriesLockError
    raise LockError, @lock.lockinfo_on_disk
  end
end

#lock_or_dieObject



86
87
88
89
90
91
92
93
94
95
96
97
98
# File 'lib/sup/index.rb', line 86

def lock_or_die
  begin
    lock
  rescue LockError => e
    $stderr.puts fancy_lock_error_message_for(e)
    $stderr.puts <<EOS

You can wait for the process to finish, or, if it crashed and left a
stale lock file behind, you can manually delete #{@lock.path}.
EOS
    exit
  end
end

#lockfileObject



44
# File 'lib/sup/index.rb', line 44

def lockfile; File.join @dir, "lock" end

#num_results_for(opts = {}) ⇒ Object



223
224
225
226
227
228
# File 'lib/sup/index.rb', line 223

def num_results_for opts={}
  return 0 if @index.size == 0 # otherwise ferret barfs ###TODO: remove this once my ferret patch is accepted

  q = build_query opts
  index.search(q, :limit => 1).total_hits
end

#saveObject



112
113
114
115
116
117
# File 'lib/sup/index.rb', line 112

def save
  Redwood::log "saving index and sources..."
  FileUtils.mkdir_p @dir unless File.exists? @dir
  save_sources
  save_index
end

#save_index(fn = File.join(@dir, "ferret")) ⇒ Object



197
198
199
# File 'lib/sup/index.rb', line 197

def save_index fn=File.join(@dir, "ferret")
  # don't have to do anything, apparently
end

#sizeObject



205
# File 'lib/sup/index.rb', line 205

def size; @index.size; end

#source_for(uri) ⇒ Object



128
# File 'lib/sup/index.rb', line 128

def source_for uri; @sources.values.find { |s| s.is_source_for? uri }; end

#sourcesObject



130
# File 'lib/sup/index.rb', line 130

def sources; @sources.values; end

#start_lock_update_threadObject



55
56
57
58
59
60
61
62
# File 'lib/sup/index.rb', line 55

def start_lock_update_thread
  @lock_update_thread = Redwood::reporting_thread("lock update") do
    while true
      sleep 30
      @lock.touch_yourself
    end
  end
end

#stop_lock_update_threadObject



64
65
66
67
# File 'lib/sup/index.rb', line 64

def stop_lock_update_thread
  @lock_update_thread.kill if @lock_update_thread
  @lock_update_thread = nil
end

#sync_message(m, docid = nil, entry = nil) ⇒ Object

Syncs the message to the index: deleting if it’s already there, and adding either way. Index state will be determined by m.labels.

docid and entry can be specified if they’re already known.



160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
# File 'lib/sup/index.rb', line 160

def sync_message m, docid=nil, entry=nil
  docid, entry = load_entry_for_id m.id unless docid && entry

  raise "no source info for message #{m.id}" unless m.source && m.source_info
  raise "trying to delete non-corresponding entry #{docid} with index message-id #{@index[docid][:message_id].inspect} and parameter message id #{m.id.inspect}" if docid && @index[docid][:message_id] != m.id

  source_id = 
    if m.source.is_a? Integer
      m.source
    else
      m.source.id or raise "unregistered source #{m.source} (id #{m.source.id.inspect})"
    end

  to = (m.to + m.cc + m.bcc).map { |x| x.email }.join(" ")
  d = {
    :message_id => m.id,
    :source_id => source_id,
    :source_info => m.source_info,
    :date => m.date.to_indexable_s,
    :body => m.content,
    :snippet => m.snippet,
    :label => m.labels.uniq.join(" "),
    :from => m.from ? m.from.email : "",
    :to => (m.to + m.cc + m.bcc).map { |x| x.email }.join(" "),
    :subject => wrap_subj(Message.normalize_subj(m.subj)),
    :refs => (m.refs + m.replytos).uniq.join(" "),
  }

  @index.delete docid if docid
  @index.add_document d
  
  docid, entry = load_entry_for_id m.id
  ## this hasn't been triggered in a long time. TODO: decide whether it's still a problem.
  raise "just added message #{m.id.inspect} but couldn't find it in a search" unless docid
  true
end

#unlockObject



100
101
102
103
104
105
# File 'lib/sup/index.rb', line 100

def unlock
  if @lock && @lock.locked?
    Redwood::log "unlocking #{lockfile}..."
    @lock.unlock
  end
end

#unwrap_subj(subj) ⇒ Object



328
# File 'lib/sup/index.rb', line 328

def unwrap_subj subj; subj =~ /__START_SUBJECT__ (.*?) __END_SUBJECT__/ && $1; end

#usual_sourcesObject



129
# File 'lib/sup/index.rb', line 129

def usual_sources; @sources.values.find_all { |s| s.usual? }; end

#wrap_subj(subj) ⇒ Object



327
# File 'lib/sup/index.rb', line 327

def wrap_subj subj; "__START_SUBJECT__ #{subj} __END_SUBJECT__"; end