Class: Cql::Client::AsynchronousClient
- Inherits:
-
Client
- Object
- Client
- Cql::Client::AsynchronousClient
show all
- Defined in:
- lib/cql/client/asynchronous_client.rb
Defined Under Namespace
Classes: AuthenticationRequired
Instance Method Summary
collapse
Constructor Details
Returns a new instance of AsynchronousClient.
7
8
9
10
11
12
13
14
15
16
17
18
19
|
# File 'lib/cql/client/asynchronous_client.rb', line 7
def initialize(options={})
connection_timeout = options[:connection_timeout]
@host = options[:host] || 'localhost'
@port = options[:port] || 9042
@io_reactor = options[:io_reactor] || Io::IoReactor.new(connection_timeout: connection_timeout)
@lock = Mutex.new
@connected = false
@connecting = false
@closing = false
@initial_keyspace = options[:keyspace]
@credentials = options[:credentials]
@connection_keyspaces = {}
end
|
Instance Method Details
#close ⇒ Object
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
|
# File 'lib/cql/client/asynchronous_client.rb', line 45
def close
@lock.synchronize do
return @closed_future if @closing
@closing = true
@closed_future = Future.new
end
when_not_connecting do
f = @io_reactor.stop
f.on_complete { @closed_future.complete!(self) }
f.on_failure { |e| @closed_future.fail!(e) }
end
@closed_future.on_complete do
@lock.synchronize do
@closing = false
@connected = false
end
end
@closed_future.on_failure do
@lock.synchronize do
@closing = false
@connected = false
end
end
@closed_future
end
|
#connect ⇒ Object
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
|
# File 'lib/cql/client/asynchronous_client.rb', line 21
def connect
@lock.synchronize do
return @connected_future if @connected || @connecting
@connecting = true
@connected_future = Future.new
end
when_not_closing do
setup_connections
end
@connected_future.on_complete do
@lock.synchronize do
@connecting = false
@connected = true
end
end
@connected_future.on_failure do
@lock.synchronize do
@connecting = false
@connected = false
end
end
@connected_future
end
|
#connected? ⇒ Boolean
71
72
73
|
# File 'lib/cql/client/asynchronous_client.rb', line 71
def connected?
@connected
end
|
#execute(cql, consistency = nil) ⇒ Object
99
100
101
102
103
104
105
106
107
108
109
|
# File 'lib/cql/client/asynchronous_client.rb', line 99
def execute(cql, consistency=nil)
consistency ||= DEFAULT_CONSISTENCY_LEVEL
return Future.failed(NotConnectedError.new) unless @connected || @connecting
f = execute_request(Protocol::QueryRequest.new(cql, consistency))
f.on_complete do
ensure_keyspace!
end
f
rescue => e
Future.failed(e)
end
|
#execute_statement(connection_id, statement_id, metadata, values, consistency) ⇒ Object
112
113
114
115
116
117
118
|
# File 'lib/cql/client/asynchronous_client.rb', line 112
def execute_statement(connection_id, statement_id, metadata, values, consistency)
return Future.failed(NotConnectedError.new) unless @connected || @connecting
request = Protocol::ExecuteRequest.new(statement_id, metadata, values, consistency || DEFAULT_CONSISTENCY_LEVEL)
execute_request(request, connection_id)
rescue => e
Future.failed(e)
end
|
#keyspace ⇒ Object
75
76
77
78
79
|
# File 'lib/cql/client/asynchronous_client.rb', line 75
def keyspace
@lock.synchronize do
return @connection_ids.map { |id| @connection_keyspaces[id] }.first
end
end
|
#prepare(cql) ⇒ Object
120
121
122
123
124
125
|
# File 'lib/cql/client/asynchronous_client.rb', line 120
def prepare(cql)
return Future.failed(NotConnectedError.new) unless @connected || @connecting
execute_request(Protocol::PrepareRequest.new(cql))
rescue => e
Future.failed(e)
end
|
#use(keyspace, connection_ids = nil) ⇒ Object
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
|
# File 'lib/cql/client/asynchronous_client.rb', line 81
def use(keyspace, connection_ids=nil)
return Future.failed(NotConnectedError.new) unless @connected || @connecting
return Future.failed(InvalidKeyspaceNameError.new(%("#{keyspace}" is not a valid keyspace name))) unless valid_keyspace_name?(keyspace)
connection_ids ||= @connection_ids
@lock.synchronize do
connection_ids = connection_ids.select { |id| @connection_keyspaces[id] != keyspace }
end
if connection_ids.any?
futures = connection_ids.map do |connection_id|
execute_request(Protocol::QueryRequest.new("USE #{keyspace}", :one), connection_id)
end
futures.compact!
return Future.combine(*futures).map { nil }
else
Future.completed(nil)
end
end
|