Class: EM::HyperDex::Client

Inherits:
Object
  • Object
show all
Defined in:
lib/em-hyperdex-client.rb

Overview

An EventMachine-enabled client for HyperDex.

This is a fairly straightforward async-friendly interface to the hyperdex NoSQL data store. All of the normal (synchronous) methods that you are used to using are available, except that they're automatically async. Schweeeeeet.

Using it is quite simple (for an EM client, anyway...). You just create an instance of EM::HyperDex::Client, and then call (almost) any of the standard HyperDex methods you know and love against it, with the same arguments. The only difference is that you can pass a block to the method, indicating what you want to do once the request is complete, and the method returns a Deferrable which you can define callbacks and errbacks. The callback will be passed whatever the HyperDex data method would ordinarily return, in a synchronous world.

Searching methods (#search, #sorted_search) work slightly differently. Instead of the entire resultset being passed back in a single callback, the callback is executed after the result set has been processed. Each item in the search result is accessed via EnumerableDeferrable#each on the deferrable that is returned from the #search method. See the docs for #search and #sorted_search for examples of what this looks like.

Defined Under Namespace

Modules: Watcher Classes: EnumerableDeferrable

Instance Method Summary collapse

Constructor Details

#initialize(host = 'localhost', port = 1982) ⇒ Client

Create a new EM::HyperDex::Client for you to play with.

This method does not take a callback block; the client is created synchronously. However, this isn't the problem you might otherwise expect it to be, because initializing a client doesn't make any network connections or otherwise potentially-blocking calls; rather, it simply initializes some data structures and then returns.

Parameters:

  • host (String) (defaults to: 'localhost')

    A hostname or IP address (v4 or v6) of a coordinator within the cluster to make initial contact.

  • port (Integer) (defaults to: 1982)

    The port of the coordinator you wish to contact.



45
46
47
48
49
# File 'lib/em-hyperdex-client.rb', line 45

def initialize(host = 'localhost', port = 1982)
	@client = HyperDex::Client::Client.new(host, port)
	@failed = false
	@outstanding = {}
end

Instance Method Details

#add_outstanding(op, df) ⇒ Object

Associate an in-progress operation with the deferrable that will be completed when the operation completes. Also handles telling EM to start watching the client's poll_fd if necessary.



217
218
219
220
221
222
223
224
225
226
227
# File 'lib/em-hyperdex-client.rb', line 217

def add_outstanding(op, df)
	# If we don't have any operations already in progress, then
	# we aren't watching the poll_fd, so we probably want to start
	# doing that now
	if @outstanding.empty?
		@em_conn = ::EM.watch(@client.poll_fd, Watcher, self)
		@em_conn.notify_readable = true
	end

	@outstanding[op] = df
end

#get_outstanding(op) ⇒ Object

Retrieve the deferrable associated with the specified operation, if one exists. Also handles telling EM to stop watching the poll_fd, if there are now no more outstanding operations.



232
233
234
235
236
237
238
239
# File 'lib/em-hyperdex-client.rb', line 232

def get_outstanding(op)
	@outstanding.delete(op).tap do
		if @outstanding.empty?
			@em_conn.detach
			@em_conn = nil
		end
	end
end

#handle_responseObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Callback from the EM.watch system, to poke us when a response is ready to be processed.



172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
# File 'lib/em-hyperdex-client.rb', line 172

def handle_response
	begin
		op = @client.loop(0)
	rescue HyperDex::Client::HyperDexClientException
		# Something has gone wrong, and we're just going to take our bat
		# and ball and go home.
		@outstanding.values.each { |op| op.fail(ex) }
		@outstanding = {}

		# Call a dummy get_outstanding so EM stops watching the socket
		get_outstanding(nil)

		@failed = true
		return
	end

	# It's possible for the client's poll_fd to see activity when there
	# isn't any new completed operation; according to rescrv, this can
	# happen "because of background activity".  In that case, `#loop`
	# called with a timeout will return `nil`, and we should just
	# return quietly.
	if op.nil?
		return
	end

	df = get_outstanding(op)

	begin
		if df.respond_to?(:item_available)
			df.item_available
			# Put the deferrable back in the outstanding ops hash if
			# there's more items to go
			add_outstanding(op, df) unless df.completed?
		else
			df.succeed(op.wait)
		end
	rescue HyperDex::Client::HyperDexClientException => ex
		df.fail(ex)
	end
end

#search(spacename, predicates) ⇒ EnumerableDeferrable

Perform a search for all objects in the specified space that match the predicates provided.

Examples:

iterating through search results

c = EM::HyperDex::Client.new
c.search(:test, :towels => HyperDex::Client::GreaterThan.new(42)).each do |r|
  puts "This is an object with a high towel count: #{r.inspect}"
end.callback do
  puts "Towel search complete"
end.errback do |ex|
  puts "Towel search failed: #{ex.class}: #{ex.message}"
end

Parameters:

  • spacename (#to_s)

    The name of the hyperdex space to search

  • predicates (Hash<#to_s, HyperDex::Client::Predicate>)

    A collection of predicates to apply to the search.

Returns:



# File 'lib/em-hyperdex-client.rb', line 86

#sorted_search(spacename, predicates, sortby, limit, maxmin) ⇒ EnumerableDeferrable

TODO:

Document the maxmin parameter properly.

Perform a search for all objects in the specified space that match the predicates provided, sorting the results and optionally returning only a subset of the results.

Parameters:

  • spacename (#to_s)

    The name of the hyperdex space to search

  • predicates (Hash<#to_s, HyperDex::Client::Predicate>)

    A collection of predicates to apply to the search.

  • sortby (#to_s)

    the attribute to sort the results by

  • limit (Integer)

    the maximum number of results to return

  • maxmin (String)

    Maximize ("max") or minimize ("min") (I shit you not, that's what the upstream docs say)

Returns:

See Also:



133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
# File 'lib/em-hyperdex-client.rb', line 133

ITERATOR_METHODS.each do |m|
	define_method(m) do |*args, &block|
		if @failed
			return failed_deferrable("This client has failed.  Please open a new one.")
		end

		begin
			iter = @client.__send__(m, *args)
		rescue StandardError => ex
			return EM::Completion.new.tap do |df|
				df.fail(ex)
			end
		end

		df = EnumerableDeferrable.new(iter)
		df.callback(&block) if block

		if ::EM.reactor_running?
			add_outstanding(iter, df)
		else
			begin
				until (item = iter.next).nil?
					df.item_available(item)
				end
				df.item_available(nil)
			rescue Exception => ex
				df.fail(ex)
			end
		end

		df
	end
end