Class: MQueue::Protocols::Sparrow

Inherits:
Object
  • Object
show all
Defined in:
lib/m_queue/protocols/sparrow.rb

Constant Summary collapse

LOG_DIR =
File.join(MQueue::MQUEUE_ROOT, 'log')
CR =
"\r\n"
ERROR =
"error"
GET =
"get"
SET =
"set"
DELETE =
"delete"
FLUSH_ALL =
"flush_all"
CLIENT_ERROR_REGEX =
/\ACLIENT_ERROR\s/i
SERVER_ERROR_REGEX =
/\ASERVER_ERROR\s/i
ERROR_REGEX =
/\AERROR\s/i
VALUE_REGEX =
/^VALUE (.+) (.+) (.+)/

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(opts = {}) ⇒ Sparrow

Returns a new instance of Sparrow.



23
24
25
# File 'lib/m_queue/protocols/sparrow.rb', line 23

def initialize(opts = {})
  self.options = opts
end

Instance Attribute Details

#optionsObject

Returns the value of attribute options.



6
7
8
# File 'lib/m_queue/protocols/sparrow.rb', line 6

def options
  @options
end

Instance Method Details

#[](queue_name) ⇒ Object



33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# File 'lib/m_queue/protocols/sparrow.rb', line 33

def [](queue_name)
  send_msg GET, queue_name
  return unless @socket
  rsp = @socket.gets
  return unless rsp =~ VALUE_REGEX
  bytes = rsp.split(' ').last.to_i
  rsp =~ /(\d+)\r/
  msg = @socket.read $1.to_i
  @socket.gets # CR
  @socket.gets # END
  msg
rescue => e
  mark_dead(e)
  nil
end

#[]=(queue_name, msg) ⇒ Object



27
28
29
30
31
# File 'lib/m_queue/protocols/sparrow.rb', line 27

def []=(queue_name, msg)
  send_msg SET, queue_name, 0, 0, msg.length, CR + msg
  return unless @socket
  @socket.gets # STORED
end

#alive?Boolean

MQueue Protocol API

Returns:

  • (Boolean)


63
64
65
# File 'lib/m_queue/protocols/sparrow.rb', line 63

def alive?
  (!@retry or @retry < Time.now)
end

#delete(queue_name) ⇒ Object



49
50
51
52
53
# File 'lib/m_queue/protocols/sparrow.rb', line 49

def delete(queue_name)
  send_msg DELETE, queue_name
  return unless @socket
  @socket.gets
end

#delete!Object



55
56
57
58
59
# File 'lib/m_queue/protocols/sparrow.rb', line 55

def delete!
  send_msg FLUSH_ALL
  return unless @socket
  @socket.gets
end

#reload!Object



71
72
73
74
75
76
# File 'lib/m_queue/protocols/sparrow.rb', line 71

def reload!
  @retry = nil
  @socket.close if @socket && !@socket.closed?
  @socket = nil
  @status = "Reloading"
end

#weightObject



67
68
69
# File 'lib/m_queue/protocols/sparrow.rb', line 67

def weight
 options[:weight] || 0  
end