Class: Supernova::SolrIndexer

Inherits:
Object
  • Object
show all
Includes:
Solr
Defined in:
lib/supernova/solr_indexer.rb

Constant Summary collapse

MAX_ROWS_TO_DIRECT_INDEX =
100
FIELD_SUFFIX_MAPPING =
{
  :raw => nil,
  :string => :s,
  :text => :t,
  :int => :i,
  :integer => :i,
  :sint => :si,
  :float => :f,
  :date => :dt,
  :boolean => :b,
  :location => :p,
  :double => :d,
  :string_array => :ms
}

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Solr

add, commit!, connection, included, select_url, truncate!, update_request

Constructor Details

#initialize(options = {}) ⇒ SolrIndexer

Returns a new instance of SolrIndexer.



67
68
69
70
71
72
73
74
# File 'lib/supernova/solr_indexer.rb', line 67

def initialize(options = {})
  options.each do |key, value|
    self.send(:"#{key}=", value) if self.respond_to?(:"#{key}=")
  end
  self.max_rows_to_direct_index ||= MAX_ROWS_TO_DIRECT_INDEX
  self.options = options
  self.ids ||= :all
end

Instance Attribute Details

#current_json_stringObject

Returns the value of attribute current_json_string.



7
8
9
# File 'lib/supernova/solr_indexer.rb', line 7

def current_json_string
  @current_json_string
end

#dbObject

Returns the value of attribute db.



7
8
9
# File 'lib/supernova/solr_indexer.rb', line 7

def db
  @db
end

#debug(message) ⇒ Object



85
86
87
88
89
90
91
92
93
94
95
96
# File 'lib/supernova/solr_indexer.rb', line 85

def debug(message)
  response = true
  time = Benchmark.realtime do
    response = yield if block_given?
  end
  if @debug == true
    message.gsub!("%COUNT%", response.count.to_s) if message.include?("%COUNT%") && response.respond_to?(:count)
    message.gsub!("%TIME%", "%.3f" % time)   if message.include?("%TIME%")
    puts "%s: %s" % [Time.now.iso8601(3), message]
  end
  response
end

#idsObject

Returns the value of attribute ids.



7
8
9
# File 'lib/supernova/solr_indexer.rb', line 7

def ids
  @ids
end

#index_file_pathObject



302
303
304
# File 'lib/supernova/solr_indexer.rb', line 302

def index_file_path
  @index_file_path ||= File.expand_path("/tmp/index_file_#{Time.now.to_i}.json")
end

#local_solrObject

Returns the value of attribute local_solr.



7
8
9
# File 'lib/supernova/solr_indexer.rb', line 7

def local_solr
  @local_solr
end

#max_rows_to_direct_indexObject

Returns the value of attribute max_rows_to_direct_index.



7
8
9
# File 'lib/supernova/solr_indexer.rb', line 7

def max_rows_to_direct_index
  @max_rows_to_direct_index
end

#optionsObject

Returns the value of attribute options.



7
8
9
# File 'lib/supernova/solr_indexer.rb', line 7

def options
  @options
end

Class Method Details

.clazz(class_name = :only_return) ⇒ Object



29
30
31
32
# File 'lib/supernova/solr_indexer.rb', line 29

def clazz(class_name =:only_return)
  @clazz = class_name if class_name != :only_return
  @clazz
end

.field_definitionsObject



15
16
17
# File 'lib/supernova/solr_indexer.rb', line 15

def field_definitions
  @field_definitions ||= {}
end

.has(key, attributes) ⇒ Object



25
26
27
# File 'lib/supernova/solr_indexer.rb', line 25

def has(key, attributes)
  field_definitions[key] = attributes.is_a?(Hash) ? attributes : { :type => attributes }
end

.method_missing(*args) ⇒ Object



39
40
41
42
43
44
45
# File 'lib/supernova/solr_indexer.rb', line 39

def method_missing(*args)
  if search_scope.respond_to?(args.first)
    search_scope.send(*args)
  else
    super
  end
end

.search_scopeObject



47
48
49
# File 'lib/supernova/solr_indexer.rb', line 47

def search_scope
  Supernova::SolrCriteria.new(self.clazz).attribute_mapping(self.field_definitions).named_scope_class(self)
end

.select_fieldsObject



19
20
21
22
23
# File 'lib/supernova/solr_indexer.rb', line 19

def select_fields
  field_definitions.map do |key, attributes|
    attributes[:virtual] != true ? key : nil
  end.compact
end

.solr_field_for_field_name_and_mapping(field, mapping) ⇒ Object



197
198
199
# File 'lib/supernova/solr_indexer.rb', line 197

def self.solr_field_for_field_name_and_mapping(field, mapping)
  [field, mapping && mapping[field.to_sym] ? suffix_from_type(mapping[field.to_sym][:type]) : nil].compact.join("_")
end

.suffix_from_type(type) ⇒ Object



193
194
195
# File 'lib/supernova/solr_indexer.rb', line 193

def self.suffix_from_type(type)
  FIELD_SUFFIX_MAPPING[type.to_sym]
end

.table_name(name = :only_return) ⇒ Object



34
35
36
37
# File 'lib/supernova/solr_indexer.rb', line 34

def table_name(name = :only_return)
  @table_name = name if name != :only_return
  @table_name
end

Instance Method Details

#append_to_json_string(row) ⇒ Object



268
269
270
271
272
273
274
275
# File 'lib/supernova/solr_indexer.rb', line 268

def append_to_json_string(row)
  if self.current_json_string.nil?
    self.current_json_string = "\{\n"
  else
    self.current_json_string << ",\n"
  end
  self.current_json_string << %("add":#{{:doc => row.delete_if { |key, value| value.nil? }}.to_json})
end

#before_index(row) ⇒ Object



115
116
117
# File 'lib/supernova/solr_indexer.rb', line 115

def before_index(row)
  row
end

#cachedObject



81
82
83
# File 'lib/supernova/solr_indexer.rb', line 81

def cached
  @cached ||= {}
end

#default_fieldsObject



154
155
156
157
158
# File 'lib/supernova/solr_indexer.rb', line 154

def default_fields
  fields = ["id"]
  fields << %("#{self.class.clazz}" AS type) if self.class.clazz
  fields
end

#defined_fieldsObject



160
161
162
163
164
# File 'lib/supernova/solr_indexer.rb', line 160

def defined_fields
  self.class.field_definitions.map do |field, options|
    field.to_s if options[:virtual] != true
  end.compact
end

#do_index_file(options = {}) ⇒ Object



342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
# File 'lib/supernova/solr_indexer.rb', line 342

def do_index_file(options = {})
  raise "solr not configured" if solr_url.nil?
  cmd = if self.local_solr
    %(curl -s '#{solr_update_url}?commit=true\\&stream.file=#{index_file_path}')
  else
    %(cd #{File.dirname(index_file_path)} && curl -s '#{solr_url}/update/json?commit=true' --data-binary @#{File.basename(index_file_path)} -H 'Content-type:application/json')
  end
  debug "run command: #{cmd}"
  out = Kernel.send(:`, cmd)
  if !out.to_s.include?(%(<int name=\"status\">0</int>))
    debug "ERROR: #{out}"
    raise "unable to index #{index_file_path}: #{out}" 
  end
  FileUtils.rm_f(self.index_file_path)
  out
end

#finalize_json_stringObject



277
278
279
# File 'lib/supernova/solr_indexer.rb', line 277

def finalize_json_string
  self.current_json_string << "\n}"
end

#finishObject



319
320
321
322
323
324
# File 'lib/supernova/solr_indexer.rb', line 319

def finish
  raise "nothing to index" if !stream_open?
  index_file_stream.puts("\}")
  index_file_stream.close
  do_index_file
end

#float_or_nil_when_abs_bigger_than(value, border) ⇒ Object



178
179
180
181
182
# File 'lib/supernova/solr_indexer.rb', line 178

def float_or_nil_when_abs_bigger_than(value, border)
  return nil if value.to_s.strip.length == 0
  value_f = value.to_f
  value_f.abs > border ? nil : value_f
end

#ids_given?Boolean

Returns:

  • (Boolean)


298
299
300
# File 'lib/supernova/solr_indexer.rb', line 298

def ids_given?
  self.ids.is_a?(Array)
end

#index!Object



98
99
100
# File 'lib/supernova/solr_indexer.rb', line 98

def index!
  index_query(query_to_index)
end

#index_directly(rows) ⇒ Object

just to be backwards compatible



251
252
253
254
255
256
257
# File 'lib/supernova/solr_indexer.rb', line 251

def index_directly(rows)
  index_with_json(rows)
  # rows.each do |row|
  #   row = Supernova::Solr.connection.add(row)
  # end
  # Supernova::Solr.connection.commit if rows.any?
end

#index_file_streamObject



330
331
332
# File 'lib/supernova/solr_indexer.rb', line 330

def index_file_stream
  @index_file_stream ||= File.open(index_file_path, "w")
end

#index_query(query) ⇒ Object



245
246
247
248
# File 'lib/supernova/solr_indexer.rb', line 245

def index_query(query)
  debug "getting rows for #{query[0,100]}"
  index_rows(query_db(query))
end

#index_rows(rows) ⇒ Object



219
220
221
222
223
224
225
226
227
228
229
230
231
232
# File 'lib/supernova/solr_indexer.rb', line 219

def index_rows(rows)
  debug "mapped %COUNT% rows to solr in %TIME%" do
    rows.map! { |r| map_for_solr(r) }
  end
  if self.max_rows_to_direct_index < rows.count
    debug "indexed #{rows.length} rows with json in %TIME%" do
      index_with_json(rows)
    end
  else
    debug "indexed #{rows.length} rows directly in %TIME%" do
      index_directly(rows)
    end
  end
end

#index_with_json(rows) ⇒ Object



234
235
236
237
# File 'lib/supernova/solr_indexer.rb', line 234

def index_with_json(rows)
  return false if rows.empty?
  options && options[:use_json_file] ? index_with_json_file(rows) : index_with_json_string(rows)
end

#index_with_json_file(rows) ⇒ Object



259
260
261
262
263
264
265
266
# File 'lib/supernova/solr_indexer.rb', line 259

def index_with_json_file(rows)
  debug "wrote #{rows.count} rows to the json file in %TIME%" do
    rows.each do |row|
      write_to_file(row)
    end
  end
  finish
end

#index_with_json_string(rows) ⇒ Object



290
291
292
293
294
295
296
# File 'lib/supernova/solr_indexer.rb', line 290

def index_with_json_string(rows)
  rows.each do |row|
    append_to_json_string(row)
  end
  finalize_json_string
  post_json_string
end

#map_for_solr(row) ⇒ Object



102
103
104
105
106
107
108
109
110
111
112
113
# File 'lib/supernova/solr_indexer.rb', line 102

def map_for_solr(row)
  map_hash_keys_to_solr(
    if self.respond_to?(:extra_attributes_from_record) && self.class.clazz
      row.merge(self.extra_attributes_from_record(Supernova.build_ar_like_record(self.class.clazz, row)).stringify_keys)
    elsif self.respond_to?(:row_to_solr)
      puts "DEPRECATION WARNING: use before_index instead of row_to_solr! in #{caller.first}"
      self.row_to_solr(row)
    else
      self.before_index(row)
    end
  )
end

#map_hash_keys_to_solr(hash) ⇒ Object



119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
# File 'lib/supernova/solr_indexer.rb', line 119

def map_hash_keys_to_solr(hash)
  @indexed_at ||= Time.now.utc.iso8601.to_s
  if hash["id"] && self.table_name
    hash["record_id_i"] = hash["id"]
    hash["id"] = [self.table_name, hash["id"]].compact.join("/") 
  end
  hash["indexed_at_dt"] = @indexed_at
  self.class.field_definitions.each do |field, options|
    if hash.has_key?(field.to_s)
      value = hash.delete(field.to_s)
      if options[:type] == :date
        if value.is_a?(Date)
          value = "#{value}T00:00:00Z" 
        elsif value.respond_to?(:utc)
          value = value.utc.iso8601
        end
      end
      hash["#{field}_#{self.class.suffix_from_type(options[:type])}"] = value
    end
  end
  hash["type"] = self.class.clazz.to_s if self.class.clazz
  hash
end

#post_json_stringObject



281
282
283
284
285
286
287
288
# File 'lib/supernova/solr_indexer.rb', line 281

def post_json_string
  Typhoeus::Request.post("#{solr_update_url}?commit=true", 
    :body => self.current_json_string, 
    :headers => { "Content-type" => "application/json; charset=utf-8" }
  ).tap do |response|
    self.current_json_string = nil
  end
end

#query_db(query) ⇒ Object



205
206
207
208
209
210
211
# File 'lib/supernova/solr_indexer.rb', line 205

def query_db(query)
  if db.respond_to?(:query)
    db.query(query, :as => :hash)
  else
    db.select_all(query)
  end
end

#query_to_indexObject



147
148
149
150
151
152
# File 'lib/supernova/solr_indexer.rb', line 147

def query_to_index
  raise "no table_name defined" if self.table_name.nil?
  query = "SELECT #{select_fields.join(", ")} FROM #{self.table_name}"
  query << " WHERE id IN (#{ids.join(", ")})" if ids_given?
  query
end

#rows(query = nil) ⇒ Object



213
214
215
216
217
# File 'lib/supernova/solr_indexer.rb', line 213

def rows(query = nil)
  debug "fetched rows in %TIME%" do
    query_db(query || query_to_index)
  end
end

#select_fieldsObject



166
167
168
# File 'lib/supernova/solr_indexer.rb', line 166

def select_fields
  default_fields + defined_fields
end

#solr_rows_to_index_for_query(query) ⇒ Object



239
240
241
242
243
# File 'lib/supernova/solr_indexer.rb', line 239

def solr_rows_to_index_for_query(query)
  query_db(query).map do |row|
    map_for_solr(row)
  end
end

#solr_update_urlObject



338
339
340
# File 'lib/supernova/solr_indexer.rb', line 338

def solr_update_url
  "#{solr_url}/update/json"
end

#solr_urlObject



334
335
336
# File 'lib/supernova/solr_indexer.rb', line 334

def solr_url
  Supernova::Solr.url.present? ? Supernova::Solr.url.to_s.gsub(/\/$/, "") : nil
end

#sql_column_from_field_and_type(field, type) ⇒ Object



184
185
186
187
188
189
190
191
# File 'lib/supernova/solr_indexer.rb', line 184

def sql_column_from_field_and_type(field, type)
  return sql_date_column_from_field(field) if type == :date
  if suffix = self.class.suffix_from_type(type)
    "#{field} AS #{field}_#{suffix}"
  else
    raise "no suffix for #{type} defined"
  end
end

#sql_date_column_from_field(field) ⇒ Object



201
202
203
# File 'lib/supernova/solr_indexer.rb', line 201

def sql_date_column_from_field(field)
  %(IF(#{field} IS NULL, NULL, CONCAT(REPLACE(#{field}, " ", "T"), "Z")) AS #{field}_dt)
end

#stream_open?Boolean

Returns:

  • (Boolean)


326
327
328
# File 'lib/supernova/solr_indexer.rb', line 326

def stream_open?
  !@index_file_stream.nil?
end

#table_nameObject



143
144
145
# File 'lib/supernova/solr_indexer.rb', line 143

def table_name
  self.class.table_name || (self.class.clazz && self.class.clazz.respond_to?(:table_name) ? self.class.clazz.table_name : nil)
end

#validate_lat(lat) ⇒ Object



170
171
172
# File 'lib/supernova/solr_indexer.rb', line 170

def validate_lat(lat)
  float_or_nil_when_abs_bigger_than(lat, 90)
end

#validate_lng(lng) ⇒ Object



174
175
176
# File 'lib/supernova/solr_indexer.rb', line 174

def validate_lng(lng)
  float_or_nil_when_abs_bigger_than(lng, 180)
end

#write_to_file(to_index) ⇒ Object



306
307
308
309
310
311
312
313
314
315
316
317
# File 'lib/supernova/solr_indexer.rb', line 306

def write_to_file(to_index)
  prefix = ",\n"
  if !stream_open?
    index_file_stream.puts "{"
    prefix = nil
  end
  filtered = to_index.inject({}) do |hash, (key, value)|
    hash[key] = value if value.to_s.strip.length > 0
    hash
  end
  index_file_stream.print(%(#{prefix}"add":#{({:doc => filtered}).to_json}))
end