Class: MysqlPR::Protocol

Inherits:
Object
  • Object
show all
Defined in:
lib/mysql-pr/protocol.rb

Overview

MySQL network protocol

Defined Under Namespace

Classes: AuthenticationPacket, ExecutePacket, FieldPacket, InitialPacket, PrepareResultPacket, ResultPacket

Constant Summary collapse

VERSION =
10
MAX_PACKET_LENGTH =
2**24-1

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(host, port, socket, conn_timeout, read_timeout, write_timeout) ⇒ Protocol

make socket connection to server.

Argument

host
String

if “localhost” or “” nil then use UNIXSocket. Otherwise use TCPSocket

port
Integer

port number using by TCPSocket

socket
String

socket file name using by UNIXSocket

conn_timeout
Integer

connect timeout (sec).

read_timeout
Integer

read timeout (sec).

write_timeout
Integer

write timeout (sec).

Exception

ClientError

connection timeout



156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
# File 'lib/mysql-pr/protocol.rb', line 156

def initialize(host, port, socket, conn_timeout, read_timeout, write_timeout)
  @insert_id = 0
  @warning_count = 0
  @gc_stmt_queue = []   # stmt id list which GC destroy.
  set_state :INIT
  @read_timeout = read_timeout
  @write_timeout = write_timeout
  begin
    Timeout.timeout conn_timeout do
      if host.nil? or host.empty? or host == "localhost"
        socket ||= ENV["MYSQL_UNIX_PORT"] || MYSQL_UNIX_PORT
        @sock = UNIXSocket.new socket
      else
        port ||= ENV["MYSQL_TCP_PORT"] || (Socket.getservbyname("mysql","tcp") rescue MYSQL_TCP_PORT)
        @sock = TCPSocket.new host, port
      end
    end
  rescue Timeout::Error
    raise ClientError, "connection timeout"
  end
end

Instance Attribute Details

#affected_rowsObject (readonly)

Returns the value of attribute affected_rows.



133
134
135
# File 'lib/mysql-pr/protocol.rb', line 133

def affected_rows
  @affected_rows
end

#charsetObject

Returns the value of attribute charset.



138
139
140
# File 'lib/mysql-pr/protocol.rb', line 138

def charset
  @charset
end

#insert_idObject (readonly)

Returns the value of attribute insert_id.



134
135
136
# File 'lib/mysql-pr/protocol.rb', line 134

def insert_id
  @insert_id
end

#messageObject (readonly)

Returns the value of attribute message.



137
138
139
# File 'lib/mysql-pr/protocol.rb', line 137

def message
  @message
end

#server_infoObject (readonly)

Returns the value of attribute server_info.



129
130
131
# File 'lib/mysql-pr/protocol.rb', line 129

def server_info
  @server_info
end

#server_statusObject (readonly)

Returns the value of attribute server_status.



135
136
137
# File 'lib/mysql-pr/protocol.rb', line 135

def server_status
  @server_status
end

#server_versionObject (readonly)

Returns the value of attribute server_version.



130
131
132
# File 'lib/mysql-pr/protocol.rb', line 130

def server_version
  @server_version
end

#sqlstateObject (readonly)

Returns the value of attribute sqlstate.



132
133
134
# File 'lib/mysql-pr/protocol.rb', line 132

def sqlstate
  @sqlstate
end

#thread_idObject (readonly)

Returns the value of attribute thread_id.



131
132
133
# File 'lib/mysql-pr/protocol.rb', line 131

def thread_id
  @thread_id
end

#warning_countObject (readonly)

Returns the value of attribute warning_count.



136
137
138
# File 'lib/mysql-pr/protocol.rb', line 136

def warning_count
  @warning_count
end

Class Method Details

.net2value(pkt, type, unsigned) ⇒ Object

Convert netdata to Ruby value

Argument

data
Packet

packet data

type
Integer

field type

unsigned
true or false

true if value is unsigned

Return

Object

converted value.



23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
# File 'lib/mysql-pr/protocol.rb', line 23

def self.net2value(pkt, type, unsigned)
  case type
  when Field::TYPE_STRING, Field::TYPE_VAR_STRING, Field::TYPE_NEWDECIMAL, Field::TYPE_BLOB
    return pkt.lcs
  when Field::TYPE_TINY
    v = pkt.utiny
    return unsigned ? v : v < 128 ? v : v-256
  when Field::TYPE_SHORT
    v = pkt.ushort
    return unsigned ? v : v < 32768 ? v : v-65536
  when Field::TYPE_INT24, Field::TYPE_LONG
    v = pkt.ulong
    return unsigned ? v : v < 2**32/2 ? v : v-2**32
  when Field::TYPE_LONGLONG
    n1, n2 = pkt.ulong, pkt.ulong
    v = (n2 << 32) | n1
    return unsigned ? v : v < 2**64/2 ? v : v-2**64
  when Field::TYPE_FLOAT
    return pkt.read(4).unpack('e').first
  when Field::TYPE_DOUBLE
    return pkt.read(8).unpack('E').first
  when Field::TYPE_DATE
    len = pkt.utiny
    y, m, d = pkt.read(len).unpack("vCC")
    t = MysqlPR::Time.new(y, m, d, nil, nil, nil)
    return t
  when Field::TYPE_DATETIME, Field::TYPE_TIMESTAMP
    len = pkt.utiny
    y, m, d, h, mi, s, sp = pkt.read(len).unpack("vCCCCCV")
    return MysqlPR::Time.new(y, m, d, h, mi, s, false, sp)
  when Field::TYPE_TIME
    len = pkt.utiny
    sign, d, h, mi, s, sp = pkt.read(len).unpack("CVCCCV")
    h = d.to_i * 24 + h.to_i
    return MysqlPR::Time.new(0, 0, 0, h, mi, s, sign!=0, sp)
  when Field::TYPE_YEAR
    return pkt.ushort
  when Field::TYPE_BIT
    return pkt.lcs
  else
    raise "not implemented: type=#{type}"
  end
end

.value2net(v) ⇒ Object

convert Ruby value to netdata

Argument

v
Object

Ruby value.

Return

Integer

type of column. Field::TYPE_*

String

netdata

Exception

ProtocolError

value too large / value is not supported



75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
# File 'lib/mysql-pr/protocol.rb', line 75

def self.value2net(v)
  case v
  when nil
    type = Field::TYPE_NULL
    val = ""
  when Integer
    if v >= 0
      if v < 256
        type = Field::TYPE_TINY | 0x8000
        val = [v].pack("C")
      elsif v < 256**2
        type = Field::TYPE_SHORT | 0x8000
        val = [v].pack("v")
      elsif v < 256**4
        type = Field::TYPE_LONG | 0x8000
        val = [v].pack("V")
      elsif v < 256**8
        type = Field::TYPE_LONGLONG | 0x8000
        val = [v&0xffffffff, v>>32].pack("VV")
      else
        raise ProtocolError, "value too large: #{v}"
      end
    else
      if -v <= 256/2
        type = Field::TYPE_TINY
        val = [v].pack("C")
      elsif -v <= 256**2/2
        type = Field::TYPE_SHORT
        val = [v].pack("v")
      elsif -v <= 256**4/2
        type = Field::TYPE_LONG
        val = [v].pack("V")
      elsif -v <= 256**8/2
        type = Field::TYPE_LONGLONG
        val = [v&0xffffffff, v>>32].pack("VV")
      else
        raise ProtocolError, "value too large: #{v}"
      end
    end
  when Float
    type = Field::TYPE_DOUBLE
    val = [v].pack("E")
  when String
    type = Field::TYPE_STRING
    val = Packet.lcs(v)
  when MysqlPR::Time, ::Time
    type = Field::TYPE_DATETIME
    val = [7, v.year, v.month, v.day, v.hour, v.min, v.sec].pack("CvCCCCC")
  else
    raise ProtocolError, "class #{v.class} is not supported"
  end
  return type, val
end

Instance Method Details

#authenticate(user, passwd, db, flag, charset) ⇒ Object

initial negotiate and authenticate.

Argument

user
String / nil

username

passwd
String / nil

password

db
String / nil

default database name. nil: no default.

flag
Integer

client flag

charset
MysqlPR::Charset / nil

charset for connection. nil: use server’s charset

Exception

ProtocolError

The old style password is not supported

Raises:



191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
# File 'lib/mysql-pr/protocol.rb', line 191

def authenticate(user, passwd, db, flag, charset)
  check_state :INIT
  @authinfo = [user, passwd, db, flag, charset]
  reset
  init_packet = InitialPacket.parse read
  @server_info = init_packet.server_version
  @server_version = init_packet.server_version.split(/\D/)[0,3].inject{|a,b|a.to_i*100+b.to_i}
  @thread_id = init_packet.thread_id
  client_flags = CLIENT_LONG_PASSWORD | CLIENT_LONG_FLAG | CLIENT_TRANSACTIONS | CLIENT_PROTOCOL_41 | CLIENT_SECURE_CONNECTION
  client_flags |= CLIENT_CONNECT_WITH_DB if db
  client_flags |= flag
  @charset = charset
  unless @charset
    @charset = Charset.by_number(init_packet.server_charset)
    @charset.encoding       # raise error if unsupported charset
  end
  netpw = encrypt_password passwd, init_packet.scramble_buff
  write AuthenticationPacket.serialize(client_flags, 1024**3, @charset.number, user, netpw, db)
  raise ProtocolError, 'The old style password is not supported' if read.to_s == "\xfe"
  set_state :READY
end

#closeObject



178
179
180
# File 'lib/mysql-pr/protocol.rb', line 178

def close
  @sock.close
end

#field_list_command(table, field) ⇒ Object

Field list command

Argument

table
String

table name.

field
String / nil

field name that may contain wild card.

Return

Array of Field

field list



310
311
312
313
314
315
316
317
318
319
320
# File 'lib/mysql-pr/protocol.rb', line 310

def field_list_command(table, field)
  synchronize do
    reset
    write [COM_FIELD_LIST, table, 0, field].pack("Ca*Ca*")
    fields = []
    until (data = read).eof?
      fields.push Field.new(FieldPacket.parse(data))
    end
    return fields
  end
end

#gc_stmt(stmt_id) ⇒ Object



445
446
447
# File 'lib/mysql-pr/protocol.rb', line 445

def gc_stmt(stmt_id)
  @gc_stmt_queue.push stmt_id
end

#get_resultObject

get result of query.

Return

integer / nil

number of fields of results. nil if no results.



242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
# File 'lib/mysql-pr/protocol.rb', line 242

def get_result
  begin
    res_packet = ResultPacket.parse read
    if res_packet.field_count.to_i > 0  # result data exists
      set_state :FIELD
      return res_packet.field_count
    end
    if res_packet.field_count.nil?      # LOAD DATA LOCAL INFILE
      filename = res_packet.message
      File.open(filename){|f| write f}
      write nil  # EOF mark
      read
    end
    @affected_rows, @insert_id, @server_status, @warning_count, @message =
      res_packet.affected_rows, res_packet.insert_id, res_packet.server_status, res_packet.warning_count, res_packet.message
    set_state :READY
    return nil
  rescue
    set_state :READY
    raise
  end
end

#kill_command(pid) ⇒ Object

Kill command



347
348
349
# File 'lib/mysql-pr/protocol.rb', line 347

def kill_command(pid)
  simple_command [COM_PROCESS_KILL, pid].pack("CV")
end

#ping_commandObject

Ping command



342
343
344
# File 'lib/mysql-pr/protocol.rb', line 342

def ping_command
  simple_command [COM_PING].pack("C")
end

#process_info_commandObject

Process info command

Return

Array of Field

field list



325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
# File 'lib/mysql-pr/protocol.rb', line 325

def process_info_command
  check_state :READY
  begin
    reset
    write [COM_PROCESS_INFO].pack("C")
    field_count = read.lcb
    fields = field_count.times.map{Field.new FieldPacket.parse(read)}
    read_eof_packet
    set_state :RESULT
    return fields
  rescue
    set_state :READY
    raise
  end
end

#query_command(query) ⇒ Object

Query command

Argument

query
String

query string

Return

Integer / nil

number of fields of results. nil if no results.



227
228
229
230
231
232
233
234
235
236
237
# File 'lib/mysql-pr/protocol.rb', line 227

def query_command(query)
  check_state :READY
  begin
    reset
    write [COM_QUERY, @charset.convert(query)].pack("Ca*")
    get_result
  rescue
    set_state :READY
    raise
  end
end

#quit_commandObject

Quit command



214
215
216
217
218
219
220
# File 'lib/mysql-pr/protocol.rb', line 214

def quit_command
  synchronize do
    reset
    write [COM_QUIT].pack("C")
    close
  end
end

#refresh_command(op) ⇒ Object

Refresh command



352
353
354
# File 'lib/mysql-pr/protocol.rb', line 352

def refresh_command(op)
  simple_command [COM_REFRESH, op].pack("CC")
end

#retr_all_records(nfields) ⇒ Object

Retrieve all records for simple query

Argument

nfields
Integer

number of fields

Return

Array of Array of String

all records



288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
# File 'lib/mysql-pr/protocol.rb', line 288

def retr_all_records(nfields)
  check_state :RESULT
  enc = charset.encoding
  begin
    all_recs = []
    until (pkt = read).eof?
      all_recs.push RawRecord.new(pkt, nfields, enc)
    end
    pkt.read(3)
    @server_status = pkt.utiny
    all_recs
  ensure
    set_state :READY
  end
end

#retr_fields(n) ⇒ Object

Retrieve n fields

Argument

n
Integer

number of fields

Return

Array of MysqlPR::Field

field list



270
271
272
273
274
275
276
277
278
279
280
281
# File 'lib/mysql-pr/protocol.rb', line 270

def retr_fields(n)
  check_state :FIELD
  begin
    fields = n.times.map{Field.new FieldPacket.parse(read)}
    read_eof_packet
    set_state :RESULT
    fields
  rescue
    set_state :READY
    raise
  end
end

#set_option_command(opt) ⇒ Object

Set option command



357
358
359
# File 'lib/mysql-pr/protocol.rb', line 357

def set_option_command(opt)
  simple_command [COM_SET_OPTION, opt].pack("Cv")
end

#shutdown_command(level) ⇒ Object

Shutdown command



362
363
364
# File 'lib/mysql-pr/protocol.rb', line 362

def shutdown_command(level)
  simple_command [COM_SHUTDOWN, level].pack("CC")
end

#statistics_commandObject

Statistics command



367
368
369
# File 'lib/mysql-pr/protocol.rb', line 367

def statistics_command
  simple_command [COM_STATISTICS].pack("C")
end

#stmt_close_command(stmt_id) ⇒ Object

Stmt close command

Argument

stmt_id
Integer

statement id



438
439
440
441
442
443
# File 'lib/mysql-pr/protocol.rb', line 438

def stmt_close_command(stmt_id)
  synchronize do
    reset
    write [COM_STMT_CLOSE, stmt_id].pack("CV")
  end
end

#stmt_execute_command(stmt_id, values) ⇒ Object

Stmt execute command

Argument

stmt_id
Integer

statement id

values
Array

parameters

Return

Integer

number of fields



403
404
405
406
407
408
409
410
411
412
413
# File 'lib/mysql-pr/protocol.rb', line 403

def stmt_execute_command(stmt_id, values)
  check_state :READY
  begin
    reset
    write ExecutePacket.serialize(stmt_id, MysqlPR::Stmt::CURSOR_TYPE_NO_CURSOR, values)
    get_result
  rescue
    set_state :READY
    raise
  end
end

#stmt_prepare_command(stmt) ⇒ Object

Stmt prepare command

Argument

stmt
String

prepared statement

Return

Integer

statement id

Integer

number of parameters

Array of Field

field list



378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
# File 'lib/mysql-pr/protocol.rb', line 378

def stmt_prepare_command(stmt)
  synchronize do
    reset
    write [COM_STMT_PREPARE, charset.convert(stmt)].pack("Ca*")
    res_packet = PrepareResultPacket.parse read
    if res_packet.param_count > 0
      res_packet.param_count.times{read}    # skip parameter packet
      read_eof_packet
    end
    if res_packet.field_count > 0
      fields = res_packet.field_count.times.map{Field.new FieldPacket.parse(read)}
      read_eof_packet
    else
      fields = []
    end
    return res_packet.statement_id, res_packet.param_count, fields
  end
end

#stmt_retr_all_records(fields, charset) ⇒ Object

Retrieve all records for prepared statement

Argument

fields
Array of MysqlPR::Fields

field list

charset
MysqlPR::Charset

Return

Array of Array of Object

all records



421
422
423
424
425
426
427
428
429
430
431
432
433
# File 'lib/mysql-pr/protocol.rb', line 421

def stmt_retr_all_records(fields, charset)
  check_state :RESULT
  enc = charset.encoding
  begin
    all_recs = []
    until (pkt = read).eof?
      all_recs.push StmtRawRecord.new(pkt, fields, enc)
    end
    all_recs
  ensure
    set_state :READY
  end
end