Class: KJess::Client

Inherits:
Object
  • Object
show all
Defined in:
lib/kjess/client.rb

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(opts = {}) ⇒ Client

Returns a new instance of Client.



27
28
29
30
31
32
33
34
# File 'lib/kjess/client.rb', line 27

def initialize( opts = {} )
  merged       = Client.defaults.merge( opts )
  @host        = merged[:host]
  @port        = merged[:port]
  @admin_port  = merged[:admin_port]
  @stats_cache = StatsCache.new( self, merged[:stats_cache_expiration] )
  @connection  = KJess::Connection.new( host, port, merged )
end

Instance Attribute Details

#admin_portObject (readonly)

Public: The admin HTTP Port on the Kestrel server



11
12
13
# File 'lib/kjess/client.rb', line 11

def admin_port
  @admin_port
end

#hostObject (readonly)

Public: The hostname of the kestrel server to connect to



5
6
7
# File 'lib/kjess/client.rb', line 5

def host
  @host
end

#portObject (readonly)

Public: The port on hostname of the Kestrel server



8
9
10
# File 'lib/kjess/client.rb', line 8

def port
  @port
end

#stats_cacheObject (readonly)

Internal: The cache of stats



14
15
16
# File 'lib/kjess/client.rb', line 14

def stats_cache
  @stats_cache
end

Class Method Details

.defaultsObject

Public: The default parameters for a client connection to a Kestrel server.



18
19
20
21
22
23
24
25
# File 'lib/kjess/client.rb', line 18

def self.defaults
  {
    :host                   => 'localhost',
    :port                   => 22133,
    :admin_port             => 2223,
    :stats_cache_expiration => 0, # number of seconds to keep stats around
  }
end

Instance Method Details

#abort(queue_name) ⇒ Object

Public: Abort an existing reliable read

queue_name - the name of the queue to abort

Returns a Response



164
165
166
# File 'lib/kjess/client.rb', line 164

def abort( queue_name )
  get( queue_name, :abort => true )
end

#close(queue_name) ⇒ Object

Public: Close an existing reliable read

queue_name - the name of the queue to abort

Returns a Response



154
155
156
# File 'lib/kjess/client.rb', line 154

def close( queue_name )
  get( queue_name, :close => true )
end

#close_and_reserve(queue_name, opts = {}) ⇒ Object

Public: Reserve the next item on the queue and close out the previous read.

This is a helper method to do a reliable read on a queue item while closing out the existing read at the same time.

queue_name - the name of the quee to retieve and item from options - Additional options

:wait_for - wait for this many ms for an item on the queue(default: 0)


135
136
137
138
# File 'lib/kjess/client.rb', line 135

def close_and_reserve( queue_name, opts = {} )
  opts = opts.merge( :close => true )
  reserve( queue_name, opts )
end

#connected?Boolean

Public: is the client connected to a server

Returns true or false

Returns:

  • (Boolean)


54
55
56
57
58
# File 'lib/kjess/client.rb', line 54

def connected?
  return false if @connection.nil?
  return false if @connection.closed?
  return true
end

#connectionObject

Internal: Allocate or return the existing connection to the server

Returns a KJess::Connection



47
48
49
# File 'lib/kjess/client.rb', line 47

def connection
  @connection ||= KJess::Connection.new( host, port )
end

#delete(queue_name) ⇒ Object

Public : Remove a queue from the kestrel server

This will remove any queue you want. Including queues that do not exist.

queue_name - the name of the queue to remove

Returns true if it was deleted false otherwise



175
176
177
178
179
# File 'lib/kjess/client.rb', line 175

def delete( queue_name )
  req  = KJess::Request::Delete.new( :queue_name => queue_name )
  resp = send_recv( req )
  return KJess::Response::Deleted === resp
end

#disconnectObject

Public: Disconnect from the Kestrel server

Returns nothing



39
40
41
42
# File 'lib/kjess/client.rb', line 39

def disconnect
  @connection.close if connected?
  @connection = nil
end

#flush(queue_name) ⇒ Object

Public: Remove all items from a queue on the kestrel server

This will flush any and all queue. Even queues that do not exist.

queue_name - the name of the queue to flush

Returns true if the queue was flushed.



188
189
190
191
192
193
194
195
196
197
# File 'lib/kjess/client.rb', line 188

def flush( queue_name )
  # It can take a long time to flush all of the messages
  # on a server, so we'll set the read timeout to something
  # much higher than usual.
  connection.with_additional_read_timeout(60) do
    req  = KJess::Request::Flush.new( :queue_name => queue_name )
    resp = send_recv( req )
    return KJess::Response::End === resp
  end
end

#flush_allObject

Public: Remove all items from all queues on the kestrel server

Returns true.



202
203
204
205
206
207
208
209
210
# File 'lib/kjess/client.rb', line 202

def flush_all
  # It can take a long time to flush all of the messages
  # on a server, so we'll set the read timeout to something
  # much higher than usual.
  connection.with_additional_read_timeout(60) do
    resp = send_recv( KJess::Request::FlushAll.new )
    return KJess::Response::End === resp
  end
end

#get(queue_name, opts = {}) ⇒ Object

Public: Retrieve an item from the given queue

queue_name - the name of the queue to retrieve an item from options - the options for retrieving the items

:wait_for - wait for this many ms for an item on the queued(default: 0)
:open     - count this as an reliable read (default: false)
:close    - close a previous read that was retrieved with :open
:abort    - close an existing read, returning that item to the head of the queue
:peek     - return the first item on the queue, and do not remove it

returns a Response



96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
# File 'lib/kjess/client.rb', line 96

def get( queue_name, opts = {} )
  opts = opts.merge( :queue_name => queue_name )
  g    = KJess::Request::Get.new( opts )

  if opts[:wait_for]
    wait_for_in_seconds = Float(opts[:wait_for]) / 1000.0
  else
    wait_for_in_seconds = 0.1
  end

  connection.with_additional_read_timeout(wait_for_in_seconds) do
    resp = send_recv( g )
    return resp.data if KJess::Response::Value === resp
    return nil
  end
end

#peek(queue_name) ⇒ Object

Public: Peek at the top item in the queue

queue_name - the name of the queue to retrieve an item from

Returns a Response



145
146
147
# File 'lib/kjess/client.rb', line 145

def peek( queue_name )
  get( queue_name, :peek => true )
end

#pingObject

Public: Returns true if the server is alive

This uses the ‘stats’ method to see if the server is alive

Returns true or false



275
276
277
278
279
280
# File 'lib/kjess/client.rb', line 275

def ping
  stats
  true
rescue Errno::ECONNREFUSED
  false
end

#queue_stats(queue_name) ⇒ Object

Public: Return just the stats about a particular queue

Returns a Hash



285
286
287
# File 'lib/kjess/client.rb', line 285

def queue_stats( queue_name )
  stats['queues'][queue_name]
end

#quitObject

Public: Disconnect from the kestrel server.

Returns true



226
227
228
229
# File 'lib/kjess/client.rb', line 226

def quit
  resp = send_recv( KJess::Request::Quit.new )
  return KJess::Response::Eof === resp
end

#reloadObject

Public: Have Kestrel reload its config.

Currently the kestrel server will say that the config was reloaded no matter what so there is no way to determine if the config failed to load.

Returns true



218
219
220
221
# File 'lib/kjess/client.rb', line 218

def reload
  resp = send_recv( KJess::Request::Reload.new )
  return KJess::Response::ReloadedConfig === resp
end

#reserve(queue_name, opts = {}) ⇒ Object

Public: Reserve the next item on the queue

This is a helper method to get an item from a queue and open it for reliable read.

queue_name - the name of the queue to retrieve an item from options - Additional options

:wait_for - wait for this many ms for an item on the queue(default: 0)


121
122
123
124
# File 'lib/kjess/client.rb', line 121

def reserve( queue_name, opts = {} )
  opts = opts.merge( :open => true )
  get( queue_name, opts )
end

#send_recv(request) ⇒ Object

Internal: Send and recive a request/response

request - the Request objec to send to the server

Returns a Response object



301
302
303
304
305
306
307
308
# File 'lib/kjess/client.rb', line 301

def send_recv( request )
  connection.write( request.to_protocol )
  line = connection.readline
  resp = KJess::Response.parse( line )
  resp.read_more( connection )
  raise resp if resp.error?
  return resp
end

#set(queue_name, item, expiration = 0) ⇒ Object

Public: Add an item to the given queue

queue_name - the queue to put an item on item - the item to put on the queue. #to_s will be called on it. expiration - The number of seconds from now to expire the item

Returns true if successful, false otherwise



78
79
80
81
82
83
# File 'lib/kjess/client.rb', line 78

def set( queue_name, item, expiration = 0 )
  s = KJess::Request::Set.new( :queue_name => queue_name, :data => item, :expiration => expiration )
  resp = send_recv( s )

  return KJess::Response::Stored === resp
end

#shutdownObject

Public: Tells the Kestrel server to shutdown

Returns nothing



292
293
294
# File 'lib/kjess/client.rb', line 292

def shutdown
  send_recv( KJess::Request::Shutdown.new )
end

#statsObject

Public: Return stats about the Kestrel server, they will be cached according to the stats_cache_expiration initialization parameter

Returns a Hash



247
248
249
# File 'lib/kjess/client.rb', line 247

def stats
  stats_cache.stats
end

#stats!Object

Internal: Return the hash of stats

Using a combination of the STATS and DUMP_STATS commands this generates a good overview of all the most used stats for a Kestrel server.

Returns a Hash



257
258
259
260
261
262
263
264
265
266
267
268
# File 'lib/kjess/client.rb', line 257

def stats!
  stats       = send_recv( KJess::Request::Stats.new )
  raise KJess::ProtocolError, "Problem receiving stats: #{stats.inspect}" unless KJess::Response::Stats === stats

  h           = stats.data
  dump_stats  = send_recv( KJess::Request::DumpStats.new )
  h['queues'] = Hash.new
  if KJess::Response::DumpedStats === dump_stats then
    h['queues'].merge!( dump_stats.data )
  end
  return h
end

#status(update_to = nil) ⇒ Object

Public: Return the server status.

Currently this is only supported in the HEAD versin of kestrel. Version where this is not available will raise ServerError.

Returns a String.



237
238
239
240
241
# File 'lib/kjess/client.rb', line 237

def status( update_to = nil )
  resp = send_recv( KJess::Request::Status.new( :update_to => update_to ) )
  raise KJess::ProtocolError, "Status command is not supported" if KJess::Response::ClientError === resp
  return resp.message
end

#versionObject

Public: Return the version of the Kestrel Server.

Return a string Raise Exception if there is a



64
65
66
67
68
69
# File 'lib/kjess/client.rb', line 64

def version
  v = KJess::Request::Version.new
  r = send_recv( v )
  return r.version if Response::Version === r
  raise KJess::ProtocolError, "Unexpected Response from VERSION command"
end