Class: StarlingServer::Handler

Inherits:
EventMachine::Connection
  • Object
show all
Defined in:
lib/starling/handler.rb

Overview

This is an internal class that’s used by Starling::Server to handle the MemCache protocol and act as an interface between the Server and the QueueCollection.

Constant Summary collapse

DATA_PACK_FMT =
"Ia*".freeze
ERR_UNKNOWN_COMMAND =

ERROR responses

"CLIENT_ERROR bad command line format\r\n".freeze
GET_COMMAND =

GET Responses

/\Aget (.{1,250})\s*\r\n/m
GET_RESPONSE =
"VALUE %s %s %s\r\n%s\r\nEND\r\n".freeze
GET_RESPONSE_EMPTY =
"END\r\n".freeze
SET_COMMAND =

SET Responses

/\Aset (.{1,250}) ([0-9]+) ([0-9]+) ([0-9]+)\r\n/m
SET_RESPONSE_SUCCESS =
"STORED\r\n".freeze
SET_RESPONSE_FAILURE =
"NOT STORED\r\n".freeze
SET_CLIENT_DATA_ERROR =
"CLIENT_ERROR bad data chunk\r\nERROR\r\n".freeze
STATS_COMMAND =

STAT Response

/\Astats\r\n/m
STATS_RESPONSE =
"STAT pid %d
STAT uptime %d
STAT time %d
STAT version %s
STAT rusage_user %0.6f
STAT rusage_system %0.6f
STAT curr_items %d
STAT total_items %d
STAT bytes %d
STAT curr_connections %d
STAT total_connections %d
STAT cmd_get %d
STAT cmd_set %d
STAT get_hits %d
STAT get_misses %d
STAT bytes_read %d
STAT bytes_written %d
STAT limit_maxbytes %d
%sEND\r\n".freeze
QUEUE_STATS_RESPONSE =
"STAT queue_%s_items %d
STAT queue_%s_total_items %d
STAT queue_%s_logsize %d
STAT queue_%s_expired_items %d
STAT queue_%s_age %d\n".freeze
SHUTDOWN_COMMAND =
/\Ashutdown\r\n/m
@@next_session_id =
1

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ Handler

Creates a new handler for the MemCache protocol that communicates with a given client.



62
63
64
# File 'lib/starling/handler.rb', line 62

def initialize(options = {})
  @opts = options
end

Instance Method Details

#post_initObject

Process incoming commands from the attached client.



69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
# File 'lib/starling/handler.rb', line 69

def post_init
  @stash = []
  @data = ""
  @data_buf = ""
  @server = @opts[:server]
  @logger = StarlingServer::Base.logger
  @expiry_stats = Hash.new(0)
  @expected_length = nil
  @server.stats[:total_connections] += 1
  set_comm_inactivity_timeout @opts[:timeout]
  @queue_collection = @opts[:queue]

  @session_id = @@next_session_id
  @@next_session_id += 1

  peer = Socket.unpack_sockaddr_in(get_peername)
  #@logger.debug "(#{@session_id}) New session from #{peer[1]}:#{peer[0]}"
end

#process(data) ⇒ Object



99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
# File 'lib/starling/handler.rb', line 99

def process(data)
  data = @data_buf + data if @data_buf.size > 0
  # our only non-normal state is consuming an object's data
  # when @expected_length is present
  if @expected_length && data.size == @expected_length
    response = set_data(data)
    @data_buf = ""
    return response
  elsif @expected_length
    @data_buf = data
    return
  end

  case data
  when SET_COMMAND
    @server.stats[:set_requests] += 1
    set($1, $2, $3, $4.to_i)
  when GET_COMMAND
    @server.stats[:get_requests] += 1
    get($1)
  when STATS_COMMAND
    stats
  when SHUTDOWN_COMMAND
    # no point in responding, they'll never get it.
    Runner::shutdown
  else
    logger.warn "Unknown command: #{data}."
    respond ERR_UNKNOWN_COMMAND
  end
rescue => e
  logger.error "Error handling request: #{e}."
  logger.debug e.backtrace.join("\n")
  respond GET_RESPONSE_EMPTY
end

#receive_data(incoming) ⇒ Object



88
89
90
91
92
93
94
95
96
97
# File 'lib/starling/handler.rb', line 88

def receive_data(incoming)
  @server.stats[:bytes_read] += incoming.size
  @data << incoming

  while data = @data.slice!(/.*?\r\n/m)
    response = process(data)
  end

  send_data response if response
end

#unbindObject



134
135
136
# File 'lib/starling/handler.rb', line 134

def unbind
  #@logger.debug "(#{@session_id}) connection ends"
end