Class: EventMachine::Protocols::Zmq2::SocketConnection

Inherits:
EM::Connection
  • Object
show all
Includes:
ConnectionMixin, PackString
Defined in:
lib/em/protocols/zmq2/socket_connection.rb

Overview

Heavy duty worker class Implements ZMTP1.0 - ZMQ2.x transport protocol

It calls following callbacks

It is not for end user usage

Constant Summary collapse

FF =
"\xff".freeze
BIG_UNPACK =
'CNNC'.freeze
SMALL_UNPACK =
'CC'.freeze

Constants included from PackString

PackString::BIG_PACK, PackString::SMALL_PACK

Instance Method Summary collapse

Methods included from PackString

#pack_string, #prepare_message

Methods included from ConnectionMixin

#not_too_busy?, #post_init, #receive_strings, #sent_data, #unbind

Constructor Details

#initialize(socket) ⇒ SocketConnection

Returns a new instance of SocketConnection.



47
48
49
50
51
# File 'lib/em/protocols/zmq2/socket_connection.rb', line 47

def initialize(socket)
  @socket = socket
  @recv_buffer = ''
  @recv_frames = [[]]
end

Instance Method Details

#_not_too_busy?Boolean

Returns:

  • (Boolean)


54
55
56
# File 'lib/em/protocols/zmq2/socket_connection.rb', line 54

def _not_too_busy?
  outbound_data_count < 32
end

#parse_frames(data) ⇒ Object



96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
# File 'lib/em/protocols/zmq2/socket_connection.rb', line 96

def parse_frames(data)
  unless @recv_buffer.empty?
    data = @recv_buffer << data
  end
  while data.bytesize >= 2
    if data.start_with?(FF)
      break  if data.bytesize < 10
      _, _, length, more = data.unpack(BIG_UNPACK)
      start_at = 10
    else
      length = data.getbyte(0)
      more = data.getbyte(1)
      start_at = 2
    end
    length -= 1
    finish = start_at + length
    break  if data.bytesize < finish
    str = data.byteslice(start_at, length)
    data = data.byteslice(finish, data.bytesize - finish)
    @recv_frames.last << str
    @recv_frames << [] if more & 1 == 0
  end
  @recv_buffer = data
end

#pop_messageObject



87
88
89
90
91
# File 'lib/em/protocols/zmq2/socket_connection.rb', line 87

def pop_message
  if @recv_frames.size > 1
    @recv_frames.shift
  end
end

#receive_data(data) ⇒ Object



80
81
82
83
84
85
# File 'lib/em/protocols/zmq2/socket_connection.rb', line 80

def receive_data(data)
  parse_frames(data)
  while message = pop_message
    receive_strings(message)
  end
end

#send_strings(strings) ⇒ Object



123
124
125
# File 'lib/em/protocols/zmq2/socket_connection.rb', line 123

def send_strings(strings)
  send_data prepare_message(strings)
end

#send_strings_or_prepared(strings, prepared) ⇒ Object



127
128
129
# File 'lib/em/protocols/zmq2/socket_connection.rb', line 127

def send_strings_or_prepared(strings, prepared)
  send_data prepared
end