Class: Tem::Mr::Search::Client

Inherits:
Object
  • Object
show all
Defined in:
lib/tem_mr_search/client.rb

Overview

Client for the map-reduce RPC server.

Constant Summary collapse

OP =
Zerg::Support::Protocols::ObjectProtocol
OPAdapter =
Zerg::Support::Sockets::ProtocolAdapter.adapter_module OP

Class Method Summary collapse

Class Method Details

.dump_database(server_addr) ⇒ Object

Dumps the server database.

In production, normal clients wouldn’t have access to this.



78
79
80
# File 'lib/tem_mr_search/client.rb', line 78

def self.dump_database(server_addr)
  issue_request server_addr, :type => :db_dump
end

.fetch_item(server_addr, item_id) ⇒ Object

Asks for an item in the server’s database.

In production, there should be per-client rate-limiting on this request.



64
65
66
# File 'lib/tem_mr_search/client.rb', line 64

def self.fetch_item(server_addr, item_id)
  issue_request server_addr, :type => :fetch, :id => item_id
end

.get_tem(server_addr) ⇒ Object

Requests information for a random TEM to be used as a query’s root TEM.

Args:

server_addr:: string with the address of the Map-Reduce server's RPC port.

Returns a hash with the following keys:

:id:: the TEM's ID (to be used as the :root_tem argument in search calls)
:ecert:: the TEM's Endorsement Certificate
:pubek:: the TEM's public Endorsement Key (from the ECert)


25
26
27
28
29
30
31
32
# File 'lib/tem_mr_search/client.rb', line 25

def self.get_tem(server_addr)
  output = issue_request server_addr, :type => :get_tem
  return nil unless output
  
  ecert = OpenSSL::X509::Certificate.new output[:ecert]
  pubek = Tem::Key.new_from_ssl_key ecert.public_key
  { :id => output[:id], :ecert => ecert, :pubek => pubek }
end

.issue_request(server_addr, request) ⇒ Object

Issues a request against a Map-Reduce server and returns the response.

This method should not be called directly.



85
86
87
88
89
90
91
92
93
# File 'lib/tem_mr_search/client.rb', line 85

def self.issue_request(server_addr, request)
  socket = Zerg::Support::SocketFactory.socket :out_addr => server_addr,
      :out_port => Server::DEFAULT_PORT, :no_delay => true
  socket.extend OPAdapter
  socket.send_object request
  response = socket.recv_object response
  socket.close
  response
end

.search(server_addr, client_query) ⇒ Object

Performs a private database search using a Map-Reduce.

Args:

server_addr:: string with the address of the Map-Reduce server's RPC port.
client_query:: a ClientQuery instance expressing the Map-Reduce search

Returns a hash with the following keys:

:result:: the result of the Map-Reduce computation
:timings:: timing statistics on the job's execution


43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# File 'lib/tem_mr_search/client.rb', line 43

def self.search(server_addr, client_query)
  tem_certs = {}
  tem_ids = {}
  [:mapper, :reducer, :finalizer].each do |sec|
    tem_info = get_tem server_addr
    tem_ids[sec] = tem_info[:id]
    # TODO: check the endorsement certificate.
    tem_certs[sec] = tem_info[:pubek]
  end
  client_query.bind tem_certs
  
  output = issue_request server_addr, :type => :search,
                                      :root_tems => tem_ids,
                                      :map_reduce => client_query.to_hash
  return nil unless output
  output.merge! :result => client_query.unpack_output(output[:result])
end

.shutdown_server(server_addr) ⇒ Object

Terminates the server.

In production, normal clients wouldn’t have access to this.



71
72
73
# File 'lib/tem_mr_search/client.rb', line 71

def self.shutdown_server(server_addr)
  issue_request server_addr, :type => :shutdown
end