Class: Droonga::SearchDistributor

Inherits:
DistributorPlugin show all
Defined in:
lib/droonga/plugin/distributor/search.rb

Instance Method Summary collapse

Methods inherited from DistributorPlugin

#broadcast_all, #initialize, #post, #scatter_all

Methods included from PluginRegisterable

#command, extended, #inherited, #method_name, #processable?, #repository

Methods inherited from Plugin

#initialize, #process, #processable?, #shutdown, #start

Constructor Details

This class inherits a constructor from Droonga::DistributorPlugin

Instance Method Details

#search(envelope) ⇒ Object



25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
# File 'lib/droonga/plugin/distributor/search.rb', line 25

def search(envelope)
  message = []
  input_names = []
  output_names = []
  output_mapper = {}

  request = envelope["body"]
  request["queries"].each do |input_name, query|
    output = query["output"]
    # Skip reducing phase for a result with no output.
    next unless output

    input_names << input_name
    output_name = input_name + "_reduced"
    output_names << output_name
    output_mapper[output_name] = {
      "output" => input_name,
    }

    # The collector module supports only "simple" format search results.
    # So we have to override the format and restore it on the gathering
    # phase.
    final_format = output["format"] || "simple"
    output["format"] = "simple"

    final_offset, final_limit = calculate_offset_and_limit!(query)

    elements = {}
    output["elements"].each do |element|
      case element
      when "count"
        elements[element] = {
          "type" => "sum",
        }
      when "records"
        # Skip reducing phase for a result with no record output.
        next if final_limit.zero?

        # Append sort key attributes to the list of output attributes
        # temporarily, for the reducing phase. After all extra columns
        # are removed on the gathering phase.
        final_attributes = collect_output_attributes(output["attributes"])
        output["attributes"] = format_attributes_to_array_style(output["attributes"])
        output["attributes"] += collect_sort_attributes(output["attributes"], query["sortBy"])
 
        elements[element] = sort_reducer(output["attributes"], query["sortBy"])
        # On the reducing phase, we apply only "limit". We cannot apply
        # "offset" on this phase because the collecter merges a pair of
        # results step by step even if there are three or more results.
        # Instead, we apply "offset" on the gethering phase.
        elements[element]["limit"] = output["limit"]

        output_mapper[output_name]["element"] = element
        output_mapper[output_name]["offset"] = final_offset
        output_mapper[output_name]["limit"] = final_limit
        output_mapper[output_name]["format"] = final_format
        output_mapper[output_name]["attributes"] = final_attributes
      end
    end

    reducer = {
      "type" => "reduce",
      "body" => {
        input_name => {
          output_name => elements,
        },
      },
      "inputs" => [input_name], # XXX should be placed in the "body"?
      "outputs" => [output_name], # XXX should be placed in the "body"?
    }
    message << reducer
  end
  gatherer = {
    "type" => "gather",
    "body" => output_mapper,
    "inputs" => output_names, # XXX should be placed in the "body"?
    "post" => true, # XXX should be placed in the "body"?
  }
  message << gatherer
  searcher = {
    "type" => "broadcast",
    "command" => "search", # XXX should be placed in the "body"?
    "dataset" => envelope["dataset"] || request["dataset"],
    "body" => request,
    "outputs" => input_names, # XXX should be placed in the "body"?
    "replica" => "random", # XXX should be placed in the "body"?
  }
  message.push(searcher)
  post(message)
end