Class: Droonga::Dispatcher

Inherits:
Object
  • Object
show all
Includes:
Loggable
Defined in:
lib/droonga/dispatcher.rb

Defined Under Namespace

Classes: MissingDatasetParameter, SessionPlanner, UnknownDataset, UnknownType

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(catalog, options) ⇒ Dispatcher

Returns a new instance of Dispatcher.



53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
# File 'lib/droonga/dispatcher.rb', line 53

def initialize(catalog, options)
  @catalog = catalog
  @options = options
  @name = @options[:name]
  @loop = EventLoop.new(Coolio::Loop.default)
  @sessions = {}
  @current_id = 0
  @local = Regexp.new("^#{@name}")
  @adapter_runners = create_adapter_runners
  @farm = Farm.new(name, @catalog, @loop, :dispatcher => self)
  @forwarder = Forwarder.new(@loop)
  @replier = Replier.new(@forwarder)
  @collector_runners = create_collector_runners
  @step_runners = create_step_runners
end

Instance Attribute Details

#nameObject (readonly)

Returns the value of attribute name.



33
34
35
# File 'lib/droonga/dispatcher.rb', line 33

def name
  @name
end

Instance Method Details

#dispatch(message, destination) ⇒ Object



168
169
170
171
172
173
174
175
176
# File 'lib/droonga/dispatcher.rb', line 168

def dispatch(message, destination)
  if local?(destination)
    process_internal_message(message)
  else
    @forwarder.forward(@message.merge("body" => message),
                       "type" => "dispatcher",
                       "to"   => destination)
  end
end

#dispatch_steps(steps) ⇒ Object



178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
# File 'lib/droonga/dispatcher.rb', line 178

def dispatch_steps(steps)
  id = generate_id
  destinations = {}
  steps.each do |step|
    dataset = step["dataset"]
    if dataset
      routes = @catalog.get_routes(dataset, step)
      step["routes"] = routes
    else
      step["routes"] ||= [id]
    end
    routes = step["routes"]
    routes.each do |route|
      destinations[farm_path(route)] = true
    end
  end
  dispatch_message = { "id" => id, "steps" => steps }
  destinations.each_key do |destination|
    dispatch(dispatch_message, destination)
  end
end

#forward(message, destination) ⇒ Object



108
109
110
111
112
# File 'lib/droonga/dispatcher.rb', line 108

def forward(message, destination)
  logger.trace("forward start")
  @forwarder.forward(message, destination)
  logger.trace("forward done")
end

#local?(route) ⇒ Boolean

Returns:

  • (Boolean)


217
218
219
# File 'lib/droonga/dispatcher.rb', line 217

def local?(route)
  route =~ @local
end

#process_internal_message(message) ⇒ Object



147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
# File 'lib/droonga/dispatcher.rb', line 147

def process_internal_message(message)
  id = message["id"]
  session = @sessions[id]
  if session
    session.receive(message["input"], message["value"])
  else
    steps = message["steps"]
    if steps
      session_planner = SessionPlanner.new(self, steps)
      dataset = message["dataset"] || @message["dataset"]
      collector_runner = @collector_runners[dataset]
      session = session_planner.create_session(id, collector_runner)
      @sessions[id] = session
    else
      #todo: take cases receiving result before its query into account
    end
    session.start
  end
  @sessions.delete(id) if session.done?
end

#process_local_message(local_message) ⇒ Object



200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
# File 'lib/droonga/dispatcher.rb', line 200

def process_local_message(local_message)
  task = local_message["task"]
  slice_name = task["route"]
  step = task["step"]
  command = step["command"]
  descendants = {}
  step["descendants"].each do |name, routes|
    descendants[name] = routes.collect do |route|
      farm_path(route)
    end
  end
  local_message["descendants"] = descendants
  farm_message = @message.merge("body" => local_message,
                                "type" => command)
  @farm.process(slice_name, farm_message)
end

#process_message(message) ⇒ Object



87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
# File 'lib/droonga/dispatcher.rb', line 87

def process_message(message)
  @message = message
  if message["type"] == "dispatcher"
    process_internal_message(message["body"])
  else
    begin
      assert_valid_message(message)
      process_input_message(message)
    rescue ErrorMessage => error
      reply("statusCode" => error.status_code,
            "body"       => error.response_body)
    rescue StandardError, LoadError, SyntaxError => error
      logger.exception("failed to process input message", error)
      formatted_error = ErrorMessages::InternalServerError.new("Unknown internal error")
      reply("statusCode" => formatted_error.status_code,
            "body"       => formatted_error.response_body)
      raise error
    end
  end
end

#reply(message) ⇒ void

This method returns an undefined value.

Replies response to replyTo.

Parameters:

  • message (Hash)

    The message to be replied. See Replier#reply for available keys.

    The key-value pairs in request message are used as the default key-value pairs. For example, if the passed message doesn't include id key, id key's value is used in request message.

See Also:



126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
# File 'lib/droonga/dispatcher.rb', line 126

def reply(message)
  adapted_message = @message.merge(message)
  adapter_runner = @adapter_runners[adapted_message["dataset"]]
  if adapter_runner
    adapted_message = adapter_runner.adapt_output(adapted_message)
  end
  if adapted_message["replyTo"].nil?
    status_code = adapted_message["statusCode"] || 200
    if status_code != 200
      dataset = adapted_message["dataset"]
      body = adapted_message["body"] || {}
      name = body["name"] || "Unknown"
      message = body["message"] || "unknown error"
      logger.error("orphan error: " +
                     "<#{dataset}>[#{name}](#{status_code}): #{message}")
    end
  else
    @replier.reply(adapted_message)
  end
end

#shutdownObject



76
77
78
79
80
81
82
83
84
85
# File 'lib/droonga/dispatcher.rb', line 76

def shutdown
  @forwarder.shutdown
  @collector_runners.each_value do |collector_runner|
    collector_runner.shutdown
  end
  @adapter_runners.each_value do |adapter_runner|
    adapter_runner.shutdown
  end
  @farm.shutdown
end

#startObject



69
70
71
72
73
74
# File 'lib/droonga/dispatcher.rb', line 69

def start
  @forwarder.start
  @farm.start

  ensure_schema
end