Class: DruidDB::Node::Broker

Inherits:
Object
  • Object
show all
Defined in:
lib/druiddb/node/broker.rb

Constant Summary collapse

QUERY_PATH =
'/druid/v2'.freeze

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(config, zk) ⇒ Broker

Returns a new instance of Broker.



7
8
9
10
# File 'lib/druiddb/node/broker.rb', line 7

def initialize(config, zk)
  @config = config
  @zk = zk
end

Instance Attribute Details

#configObject (readonly)

Returns the value of attribute config.



6
7
8
# File 'lib/druiddb/node/broker.rb', line 6

def config
  @config
end

#zkObject (readonly)

Returns the value of attribute zk.



6
7
8
# File 'lib/druiddb/node/broker.rb', line 6

def zk
  @zk
end

Instance Method Details

#connectionObject



12
13
14
15
16
17
# File 'lib/druiddb/node/broker.rb', line 12

def connection
  broker = zk.registry["#{config.discovery_path}/druid:broker"].first
  raise DruidDB::ConnectionError, 'no druid brokers available' if broker.nil?
  zk.registry["#{config.discovery_path}/druid:broker"].rotate! # round-robin load balancing
  DruidDB::Connection.new(host: broker[:host], port: broker[:port])
end

#query(query_object) ⇒ Object

Raises:



19
20
21
22
23
24
25
26
27
28
29
30
31
32
# File 'lib/druiddb/node/broker.rb', line 19

def query(query_object)
  begin
    response = connection.post(QUERY_PATH, query_object)
  rescue DruidDB::ConnectionError
    # TODO: Log
    # TODO: This sucks, make it better
    (zk.registry["#{config.discovery_path}/druid:broker"].size - 1).times do
      response = connection.post(QUERY_PATH, query_object)
      break if response.code.to_i == 200
    end
  end
  raise QueryError unless response.code.to_i == 200
  JSON.parse(response.body)
end