Class: RBHive::Connection

Inherits:
Object
  • Object
show all
Defined in:
lib/rbhive/connection.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(server, port = 10_000, logger = StdOutLogger.new) ⇒ Connection

Returns a new instance of Connection.



34
35
36
37
38
39
40
41
42
# File 'lib/rbhive/connection.rb', line 34

def initialize(server, port=10_000, logger=StdOutLogger.new)
  @socket = Thrift::Socket.new(server, port)
  @transport = Thrift::BufferedTransport.new(@socket)
  @protocol = Thrift::BinaryProtocol.new(@transport)
  @client = Hive::Thrift::ThriftHive::Client.new(@protocol)
  @logger = logger
  @logger.info("Connecting to #{server} on port #{port}")
  @mutex = Mutex.new
end

Dynamic Method Handling

This class handles dynamic methods through the method_missing method

#method_missing(meth, *args) ⇒ Object



129
130
131
# File 'lib/rbhive/connection.rb', line 129

def method_missing(meth, *args)
  client.send(meth, *args)
end

Instance Attribute Details

#clientObject (readonly)

Returns the value of attribute client.



32
33
34
# File 'lib/rbhive/connection.rb', line 32

def client
  @client
end

Instance Method Details

#add_columns(schema) ⇒ Object



125
126
127
# File 'lib/rbhive/connection.rb', line 125

def add_columns(schema)
  execute(schema.add_columns_statement)
end

#closeObject



48
49
50
# File 'lib/rbhive/connection.rb', line 48

def close
  @transport.close
end

#create_table(schema) ⇒ Object



112
113
114
# File 'lib/rbhive/connection.rb', line 112

def create_table(schema)
  execute(schema.create_table_statement)
end

#drop_table(name) ⇒ Object



116
117
118
119
# File 'lib/rbhive/connection.rb', line 116

def drop_table(name)
  name = name.name if name.is_a?(TableSchema)
  execute("DROP TABLE `#{name}`")
end

#execute(query) ⇒ Object



56
57
58
# File 'lib/rbhive/connection.rb', line 56

def execute(query)
  execute_safe(query)
end

#explain(query) ⇒ Object



60
61
62
63
64
65
# File 'lib/rbhive/connection.rb', line 60

def explain(query)
  safe do
    execute_unsafe("EXPLAIN "+ query)
    ExplainResult.new(client.fetchAll)
  end
end

#fetch(query) ⇒ Object



80
81
82
83
84
85
86
87
# File 'lib/rbhive/connection.rb', line 80

def fetch(query)
  safe do
    execute_unsafe(query)
    rows = client.fetchAll
    the_schema = SchemaDefinition.new(client.getSchema, rows.first)
    ResultSet.new(rows, the_schema)
  end
end

#fetch_in_batch(query, batch_size = 1_000) ⇒ Object



89
90
91
92
93
94
95
96
97
# File 'lib/rbhive/connection.rb', line 89

def fetch_in_batch(query, batch_size=1_000)
  safe do
    execute_unsafe(query)
    until (next_batch = client.fetchN(batch_size)).empty?
      the_schema ||= SchemaDefinition.new(client.getSchema, next_batch.first)
      yield ResultSet.new(next_batch, the_schema)
    end
  end
end

#first(query) ⇒ Object



99
100
101
102
103
104
105
106
# File 'lib/rbhive/connection.rb', line 99

def first(query)
  safe do
    execute_unsafe(query)
    row = client.fetchOne
    the_schema = SchemaDefinition.new(client.getSchema, row)
    ResultSet.new([row], the_schema).first
  end
end

#openObject



44
45
46
# File 'lib/rbhive/connection.rb', line 44

def open
  @transport.open
end

#priority=(priority) ⇒ Object



67
68
69
# File 'lib/rbhive/connection.rb', line 67

def priority=(priority)
  set("mapred.job.priority", priority)
end

#queue=(queue) ⇒ Object



71
72
73
# File 'lib/rbhive/connection.rb', line 71

def queue=(queue)
  set("mapred.job.queue.name", queue)
end

#replace_columns(schema) ⇒ Object



121
122
123
# File 'lib/rbhive/connection.rb', line 121

def replace_columns(schema)
  execute(schema.replace_columns_statement)
end

#schema(example_row = []) ⇒ Object



108
109
110
# File 'lib/rbhive/connection.rb', line 108

def schema(example_row=[])
  safe { SchemaDefinition.new(client.getSchema, example_row) }
end

#set(name, value) ⇒ Object



75
76
77
78
# File 'lib/rbhive/connection.rb', line 75

def set(name,value)
  @logger.info("Setting #{name}=#{value}")
  client.execute("SET #{name}=#{value}")
end