Class: KJess::Client
- Inherits:
-
Object
- Object
- KJess::Client
- Defined in:
- lib/kjess/client.rb
Instance Attribute Summary collapse
-
#admin_port ⇒ Object
readonly
Public: The admin HTTP Port on the Kestrel server.
-
#host ⇒ Object
readonly
Public: The hostname of the kestrel server to connect to.
-
#port ⇒ Object
readonly
Public: The port on hostname of the Kestrel server.
-
#stats_cache ⇒ Object
readonly
Internal: The cache of stats.
Class Method Summary collapse
-
.defaults ⇒ Object
Public: The default parameters for a client connection to a Kestrel server.
Instance Method Summary collapse
-
#abort(queue_name) ⇒ Object
Public: Abort an existing reliable read.
-
#close(queue_name) ⇒ Object
Public: Close an existing reliable read.
-
#close_and_reserve(queue_name, opts = {}) ⇒ Object
Public: Reserve the next item on the queue and close out the previous read.
-
#connected? ⇒ Boolean
Public: is the client connected to a server.
-
#connection ⇒ Object
Internal: Allocate or return the existing connection to the server.
-
#delete(queue_name) ⇒ Object
Public : Remove a queue from the kestrel server.
-
#disconnect ⇒ Object
Public: Disconnect from the Kestrel server.
-
#flush(queue_name) ⇒ Object
Public: Remove all items from a queue on the kestrel server.
-
#flush_all ⇒ Object
Public: Remove all items from all queues on the kestrel server.
-
#get(queue_name, opts = {}) ⇒ Object
Public: Retrieve an item from the given queue.
-
#initialize(opts = {}) ⇒ Client
constructor
A new instance of Client.
-
#peek(queue_name) ⇒ Object
Public: Peek at the top item in the queue.
-
#ping ⇒ Object
Public: Returns true if the server is alive.
-
#queue_stats(queue_name) ⇒ Object
Public: Return just the stats about a particular queue.
-
#quit ⇒ Object
Public: Disconnect from the kestrel server.
-
#reload ⇒ Object
Public: Have Kestrel reload its config.
-
#reserve(queue_name, opts = {}) ⇒ Object
Public: Reserve the next item on the queue.
-
#send_recv(request) ⇒ Object
Internal: Send and recive a request/response.
-
#set(queue_name, item, expiration = 0) ⇒ Object
Public: Add an item to the given queue.
-
#shutdown ⇒ Object
Public: Tells the Kestrel server to shutdown.
-
#stats ⇒ Object
Public: Return stats about the Kestrel server, they will be cached according to the stats_cache_expiration initialization parameter.
-
#stats! ⇒ Object
Internal: Return the hash of stats.
-
#status(update_to = nil) ⇒ Object
Public: Return the server status.
-
#version ⇒ Object
Public: Return the version of the Kestrel Server.
Constructor Details
#initialize(opts = {}) ⇒ Client
Returns a new instance of Client.
27 28 29 30 31 32 33 34 |
# File 'lib/kjess/client.rb', line 27 def initialize( opts = {} ) merged = Client.defaults.merge( opts ) @host = merged[:host] @port = merged[:port] @admin_port = merged[:admin_port] @stats_cache = StatsCache.new( self, merged[:stats_cache_expiration] ) @connection = KJess::Connection.new( host, port, merged ) end |
Instance Attribute Details
#admin_port ⇒ Object (readonly)
Public: The admin HTTP Port on the Kestrel server
11 12 13 |
# File 'lib/kjess/client.rb', line 11 def admin_port @admin_port end |
#host ⇒ Object (readonly)
Public: The hostname of the kestrel server to connect to
5 6 7 |
# File 'lib/kjess/client.rb', line 5 def host @host end |
#port ⇒ Object (readonly)
Public: The port on hostname of the Kestrel server
8 9 10 |
# File 'lib/kjess/client.rb', line 8 def port @port end |
#stats_cache ⇒ Object (readonly)
Internal: The cache of stats
14 15 16 |
# File 'lib/kjess/client.rb', line 14 def stats_cache @stats_cache end |
Class Method Details
.defaults ⇒ Object
Public: The default parameters for a client connection to a Kestrel server.
18 19 20 21 22 23 24 25 |
# File 'lib/kjess/client.rb', line 18 def self.defaults { :host => 'localhost', :port => 22133, :admin_port => 2223, :stats_cache_expiration => 0, # number of seconds to keep stats around } end |
Instance Method Details
#abort(queue_name) ⇒ Object
Public: Abort an existing reliable read
queue_name - the name of the queue to abort
Returns a Response
164 165 166 |
# File 'lib/kjess/client.rb', line 164 def abort( queue_name ) get( queue_name, :abort => true ) end |
#close(queue_name) ⇒ Object
Public: Close an existing reliable read
queue_name - the name of the queue to abort
Returns a Response
154 155 156 |
# File 'lib/kjess/client.rb', line 154 def close( queue_name ) get( queue_name, :close => true ) end |
#close_and_reserve(queue_name, opts = {}) ⇒ Object
Public: Reserve the next item on the queue and close out the previous read.
This is a helper method to do a reliable read on a queue item while closing out the existing read at the same time.
queue_name - the name of the quee to retieve and item from options - Additional options
:wait_for - wait for this many ms for an item on the queue(default: 0)
135 136 137 138 |
# File 'lib/kjess/client.rb', line 135 def close_and_reserve( queue_name, opts = {} ) opts = opts.merge( :close => true ) reserve( queue_name, opts ) end |
#connected? ⇒ Boolean
Public: is the client connected to a server
Returns true or false
54 55 56 57 58 |
# File 'lib/kjess/client.rb', line 54 def connected? return false if @connection.nil? return false if @connection.closed? return true end |
#connection ⇒ Object
Internal: Allocate or return the existing connection to the server
Returns a KJess::Connection
47 48 49 |
# File 'lib/kjess/client.rb', line 47 def connection @connection ||= KJess::Connection.new( host, port ) end |
#delete(queue_name) ⇒ Object
Public : Remove a queue from the kestrel server
This will remove any queue you want. Including queues that do not exist.
queue_name - the name of the queue to remove
Returns true if it was deleted false otherwise
175 176 177 178 179 |
# File 'lib/kjess/client.rb', line 175 def delete( queue_name ) req = KJess::Request::Delete.new( :queue_name => queue_name ) resp = send_recv( req ) return KJess::Response::Deleted === resp end |
#disconnect ⇒ Object
Public: Disconnect from the Kestrel server
Returns nothing
39 40 41 42 |
# File 'lib/kjess/client.rb', line 39 def disconnect @connection.close if connected? @connection = nil end |
#flush(queue_name) ⇒ Object
Public: Remove all items from a queue on the kestrel server
This will flush any and all queue. Even queues that do not exist.
queue_name - the name of the queue to flush
Returns true if the queue was flushed.
188 189 190 191 192 193 194 195 196 197 |
# File 'lib/kjess/client.rb', line 188 def flush( queue_name ) # It can take a long time to flush all of the messages # on a server, so we'll set the read timeout to something # much higher than usual. connection.with_additional_read_timeout(60) do req = KJess::Request::Flush.new( :queue_name => queue_name ) resp = send_recv( req ) return KJess::Response::End === resp end end |
#flush_all ⇒ Object
Public: Remove all items from all queues on the kestrel server
Returns true.
202 203 204 205 206 207 208 209 210 |
# File 'lib/kjess/client.rb', line 202 def flush_all # It can take a long time to flush all of the messages # on a server, so we'll set the read timeout to something # much higher than usual. connection.with_additional_read_timeout(60) do resp = send_recv( KJess::Request::FlushAll.new ) return KJess::Response::End === resp end end |
#get(queue_name, opts = {}) ⇒ Object
Public: Retrieve an item from the given queue
queue_name - the name of the queue to retrieve an item from options - the options for retrieving the items
:wait_for - wait for this many ms for an item on the queued(default: 0)
:open - count this as an reliable read (default: false)
:close - close a previous read that was retrieved with :open
:abort - close an existing read, returning that item to the head of the queue
:peek - return the first item on the queue, and do not remove it
returns a Response
96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 |
# File 'lib/kjess/client.rb', line 96 def get( queue_name, opts = {} ) opts = opts.merge( :queue_name => queue_name ) g = KJess::Request::Get.new( opts ) if opts[:wait_for] wait_for_in_seconds = Float(opts[:wait_for]) / 1000.0 else wait_for_in_seconds = 0.1 end connection.with_additional_read_timeout(wait_for_in_seconds) do resp = send_recv( g ) return resp.data if KJess::Response::Value === resp return nil end end |
#peek(queue_name) ⇒ Object
Public: Peek at the top item in the queue
queue_name - the name of the queue to retrieve an item from
Returns a Response
145 146 147 |
# File 'lib/kjess/client.rb', line 145 def peek( queue_name ) get( queue_name, :peek => true ) end |
#ping ⇒ Object
Public: Returns true if the server is alive
This uses the ‘stats’ method to see if the server is alive
Returns true or false
275 276 277 278 279 280 |
# File 'lib/kjess/client.rb', line 275 def ping stats true rescue Errno::ECONNREFUSED false end |
#queue_stats(queue_name) ⇒ Object
Public: Return just the stats about a particular queue
Returns a Hash
285 286 287 |
# File 'lib/kjess/client.rb', line 285 def queue_stats( queue_name ) stats['queues'][queue_name] end |
#quit ⇒ Object
Public: Disconnect from the kestrel server.
Returns true
226 227 228 229 |
# File 'lib/kjess/client.rb', line 226 def quit resp = send_recv( KJess::Request::Quit.new ) return KJess::Response::Eof === resp end |
#reload ⇒ Object
Public: Have Kestrel reload its config.
Currently the kestrel server will say that the config was reloaded no matter what so there is no way to determine if the config failed to load.
Returns true
218 219 220 221 |
# File 'lib/kjess/client.rb', line 218 def reload resp = send_recv( KJess::Request::Reload.new ) return KJess::Response::ReloadedConfig === resp end |
#reserve(queue_name, opts = {}) ⇒ Object
Public: Reserve the next item on the queue
This is a helper method to get an item from a queue and open it for reliable read.
queue_name - the name of the queue to retrieve an item from options - Additional options
:wait_for - wait for this many ms for an item on the queue(default: 0)
121 122 123 124 |
# File 'lib/kjess/client.rb', line 121 def reserve( queue_name, opts = {} ) opts = opts.merge( :open => true ) get( queue_name, opts ) end |
#send_recv(request) ⇒ Object
Internal: Send and recive a request/response
request - the Request objec to send to the server
Returns a Response object
301 302 303 304 305 306 307 308 |
# File 'lib/kjess/client.rb', line 301 def send_recv( request ) connection.write( request.to_protocol ) line = connection.readline resp = KJess::Response.parse( line ) resp.read_more( connection ) raise resp if resp.error? return resp end |
#set(queue_name, item, expiration = 0) ⇒ Object
Public: Add an item to the given queue
queue_name - the queue to put an item on item - the item to put on the queue. #to_s will be called on it. expiration - The number of seconds from now to expire the item
Returns true if successful, false otherwise
78 79 80 81 82 83 |
# File 'lib/kjess/client.rb', line 78 def set( queue_name, item, expiration = 0 ) s = KJess::Request::Set.new( :queue_name => queue_name, :data => item, :expiration => expiration ) resp = send_recv( s ) return KJess::Response::Stored === resp end |
#shutdown ⇒ Object
Public: Tells the Kestrel server to shutdown
Returns nothing
292 293 294 |
# File 'lib/kjess/client.rb', line 292 def shutdown send_recv( KJess::Request::Shutdown.new ) end |
#stats ⇒ Object
Public: Return stats about the Kestrel server, they will be cached according to the stats_cache_expiration initialization parameter
Returns a Hash
247 248 249 |
# File 'lib/kjess/client.rb', line 247 def stats stats_cache.stats end |
#stats! ⇒ Object
Internal: Return the hash of stats
Using a combination of the STATS and DUMP_STATS commands this generates a good overview of all the most used stats for a Kestrel server.
Returns a Hash
257 258 259 260 261 262 263 264 265 266 267 268 |
# File 'lib/kjess/client.rb', line 257 def stats! stats = send_recv( KJess::Request::Stats.new ) raise KJess::ProtocolError, "Problem receiving stats: #{stats.inspect}" unless KJess::Response::Stats === stats h = stats.data dump_stats = send_recv( KJess::Request::DumpStats.new ) h['queues'] = Hash.new if KJess::Response::DumpedStats === dump_stats then h['queues'].merge!( dump_stats.data ) end return h end |
#status(update_to = nil) ⇒ Object
Public: Return the server status.
Currently this is only supported in the HEAD versin of kestrel. Version where this is not available will raise ServerError.
Returns a String.
237 238 239 240 241 |
# File 'lib/kjess/client.rb', line 237 def status( update_to = nil ) resp = send_recv( KJess::Request::Status.new( :update_to => update_to ) ) raise KJess::ProtocolError, "Status command is not supported" if KJess::Response::ClientError === resp return resp. end |
#version ⇒ Object
Public: Return the version of the Kestrel Server.
Return a string Raise Exception if there is a
64 65 66 67 68 69 |
# File 'lib/kjess/client.rb', line 64 def version v = KJess::Request::Version.new r = send_recv( v ) return r.version if Response::Version === r raise KJess::ProtocolError, "Unexpected Response from VERSION command" end |