Module: RStyx::Client::StyxClient

Defined in:
lib/rstyx/client.rb

Overview

Message receiving module for the Styx client. The client will assemble all inbound messages.

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#keyringauthObject

Returns the value of attribute keyringauth.



41
42
43
# File 'lib/rstyx/client.rb', line 41

def keyringauth
  @keyringauth
end

#sentmessagesObject

Returns the value of attribute sentmessages.



41
42
43
# File 'lib/rstyx/client.rb', line 41

def sentmessages
  @sentmessages
end

Instance Method Details

#disconnectObject

Disconnect from the remote server.



178
179
180
181
182
183
184
185
# File 'lib/rstyx/client.rb', line 178

def disconnect
  # flush all outstanding messages before disconnect
  sentmessages.each_key do |tag|
    rflush = send_message(Message::Tflush.new(:oldtag => tag))
  end

  EventMachine::stop_event_loop
end

#post_initObject



43
44
45
46
47
48
# File 'lib/rstyx/client.rb', line 43

def post_init
  @msgbuffer = ""
  @lock = Mutex.new
  @sentmessages = {}
  @keyringauth = nil
end

#receive_data(data) ⇒ Object

Receive data from the network connection, called by EventMachine.



117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
# File 'lib/rstyx/client.rb', line 117

def receive_data(data)
  # If we are in keyring authentication mode, write any data received
  # into the @keyringauth's buffer, and simply return.
  unless (@keyringauth.nil?)
    @keyringauth << data
    return
  end
  @msgbuffer << data
  DEBUG > 1 && puts(" << #{data.unpack("H*").inspect}")
  while @msgbuffer.length > 4
    length = @msgbuffer.unpack("V")[0]
    # Break out if there is not enough data in the message
    # buffer to construct a message.
    if @msgbuffer.length < length
      break
    end

    # Decode the received data
    message, @msgbuffer = @msgbuffer.unpack("a#{length}a*")
    styxmsg = Message::StyxMessage.from_bytes(message)
    DEBUG > 0 && puts(" << #{styxmsg.to_s}")
    # and look for its tag in the hash of received messages
    tmsg, cb = @lock.synchronize do
      @sentmessages.delete(styxmsg.tag)
    end

    if tmsg.nil?
      # Discard unrecognized messages.
      next
    end

    if styxmsg.class == Message::Rflush
      # We need to delete the oldtag as well, and send the
      # rflush to the original sender if possible, so they
      # don't wait forever.
      if tmsg.respond_to?(:oldtag)
        otmsg, ocb = @lock.synchronize do
          @sentmessages.delete(tmsg.oldtag)
        end
      end

      if !otmsg.nil? && !ocb.nil?
        ocb.call(otmsg, styxmsg)
      end
    end

    # Now, activate the callback block.
    if !(tmsg.nil? || cb.nil?)
      cb.call(tmsg, styxmsg)
    end

    # after all this is done, there may still be enough data in
    # the message buffer for more messages so keep looping.
  end
  # If we get here, we don't have enough data in the buffer to
  # build a new message.
end

#send_message(message, timeout = 0) ⇒ Object

Send a message synchronously. If an error occurs, a StyxException is raised.

message

The Styx message to send.

timeout

optional timeout for receiving the response.



88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
# File 'lib/rstyx/client.rb', line 88

def send_message(message, timeout=0)
  # The queue here holds the response message, and is used
  # to communicate with the receive_data thread, which ultimately
  # calls the block passed to send_message_async.
  queue = Queue.new
  send_message_async(message) do |tx,rx|
    # Enqueue the response message -- this runs in the
    # receive_data thread
    queue << rx
  end
  Timeout::timeout(timeout, StyxException.new("timeout waiting for a reply to #{message.to_s}")) do
    # This will block until the queue contains something
    resp = queue.shift
    # Check the response to see if it is the response to
    # the transmitted message.
    if resp.class == Message::Rerror
      raise StyxException.new("#{resp.ename}")
    end

    if resp.ident != message.ident + 1
      raise StyxException.new("Unexpected #{resp.to_s} received in response to #{message.to_s}")
    end
    return(resp)
  end
end

#send_message_async(message, &block) ⇒ Object

Send a message asynchronously.

message
StyxMessage

the message to be sent

block
Proc

the callback to use

return
Fixnum

the tag number used.



57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
# File 'lib/rstyx/client.rb', line 57

def send_message_async(message, &block)
  # store the message and callback indexed by tag
  @lock.synchronize do
    if message.tag.nil?
      # If a tag has not been explicitly specified, get
      # a new tag for the message. We use the current
      # thread's object ID as the base and use what
      # amounts to a linear probing algorithm to
      # determine a new tag in case of collisions.
      tag = Thread.current.object_id % MAX_TAG
      while @sentmessages.has_key?(tag)
        tag += 1
      end
      message.tag = tag
    end
    @sentmessages[message.tag] = [message, block]
  end

  DEBUG > 0 && puts(" >> #{message.to_s}")
  DEBUG > 1 && puts(" >> #{message.to_bytes.unpack("H*").inspect}")
  send_data(message.to_bytes)
  return(message.tag)
end