Class: Rumai::IXP::Agent

Inherits:
Object show all
Defined in:
lib/rumai/ixp/transport.rb

Overview

A thread-safe channel that multiplexes many threads onto a single 9P2000 connection.

The send/recv implementation is based on the XCB cookie approach: www.x.org/releases/X11R7.5/doc/libxcb/tutorial/#requestsreplies

Defined Under Namespace

Classes: FidStream, RangedPool

Constant Summary collapse

MODES =
{
  'r' => Topen::OREAD,
  'w' => Topen::OWRITE,
  't' => Topen::ORCLOSE,
  '+' => Topen::ORDWR,
}

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(stream) ⇒ Agent

Returns a new instance of Agent.

Parameters:

  • stream

    I/O stream on which a 9P2000 server is listening.



22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# File 'lib/rumai/ixp/transport.rb', line 22

def initialize stream
  @stream = stream

  @recv_buf = {} # tag => message
  @recv_lock = Mutex.new

  @tag_pool = RangedPool.new(0...BYTE2_MASK)
  @fid_pool = RangedPool.new(0...BYTE4_MASK)

  # establish connection with 9P2000 server
  req = Tversion.new(
    :tag     => Fcall::NOTAG,
    :msize   => Tversion::MSIZE,
    :version => Tversion::VERSION
  )
  rsp = talk(req)

  unless req.version == rsp.version
    raise Error, "protocol mismatch: self=#{req.version.inspect} server=#{rsp.version.inspect}"
  end

  @msize = rsp.msize

  # authenticate the connection (not necessary for wmii)
  @auth_fid = Fcall::NOFID

  # attach to filesystem root
  @root_fid = @fid_pool.obtain
  attach @root_fid, @auth_fid
end

Instance Attribute Details

#msizeObject (readonly)

Returns the value of attribute msize.



16
17
18
# File 'lib/rumai/ixp/transport.rb', line 16

def msize
  @msize
end

Instance Method Details

#attach(root_fid, auth_fid = Fcall::NOFID, auth_name = ) ⇒ Object

Associates the given FID with the FS root.



458
459
460
461
462
463
464
465
# File 'lib/rumai/ixp/transport.rb', line 458

def attach root_fid, auth_fid = Fcall::NOFID, auth_name = ENV['USER']
  talk Tattach.new(
    :fid    => root_fid,
    :afid   => auth_fid,
    :uname  => ENV['USER'],
    :aname  => auth_name
  )
end

#clunk(fid) ⇒ Object

Retires the given FID from use.



470
471
472
473
# File 'lib/rumai/ixp/transport.rb', line 470

def clunk fid
  talk Tclunk.new(:fid => fid)
  @fid_pool.release fid
end

#create(path, mode = 'rw', perm = 0644) ⇒ Object

Creates a new file at the given path that is accessible using the given modes for a user having the given permission bits.



383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
# File 'lib/rumai/ixp/transport.rb', line 383

def create path, mode = 'rw', perm = 0644
  prefix = File.dirname(path)
  target = File.basename(path)

  with_fid do |prefix_fid|
    walk_fid prefix_fid, prefix

    # create the file
    talk Tcreate.new(
      :fid => prefix_fid,
      :name => target,
      :perm => perm,
      :mode => MODES.parse(mode)
    )
  end
end

#entries(path) ⇒ Object

Returns the basenames of all files inside the directory at the given path.

See Also:

  • Dir::entries


363
364
365
366
367
368
369
# File 'lib/rumai/ixp/transport.rb', line 363

def entries path
  unless stat(path).directory?
    raise ArgumentError, "#{path.inspect} is not a directory"
  end

  read(path).map! {|t| t.name }
end

#open(path, mode = 'r') ⇒ Object

Opens the given path for I/O access through a FidStream object. If a block is given, it is invoked with a FidStream object and the stream is closed afterwards.

See Also:

  • File::open


208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
# File 'lib/rumai/ixp/transport.rb', line 208

def open path, mode = 'r'
  # open the file
  path_fid = walk(path)
  talk Topen.new(
    :fid  => path_fid,
    :mode => MODES.parse(mode)
  )
  stream = FidStream.new(self, path_fid, @msize)

  # return the file stream
  if block_given?
    begin
      yield stream
    ensure
      stream.close
    end
  else
    stream
  end
end

#read(path, *args) ⇒ Object

Returns the content of the file/directory at the given path.



353
354
355
# File 'lib/rumai/ixp/transport.rb', line 353

def read path, *args
  open(path) {|f| f.read(*args) }
end

#recv(tag) ⇒ Object

Returns the reply for the given ticket, which was previously given to you when you sent the corresponding request Fcall.



120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
# File 'lib/rumai/ixp/transport.rb', line 120

def recv tag
  loop do
    reply = @recv_lock.synchronize do
      if @recv_buf.key? tag
        @recv_buf.delete tag
      else
        # reply was not in receive buffer, so read
        # the next reply... hoping that it is ours

        next_reply_available =
          @recv_buf.empty? || begin
            # check (in a non-blocking fashion) if
            # the stream has reply for us right now
            @stream.ungetc @stream.read_nonblock(1).unpack('C').first
            true
          rescue Errno::EAGAIN
            # the stream is empty
          end

        if next_reply_available
          msg = Fcall.from_9p(@stream)

          if msg.tag == tag
            msg
          else
            # we got someone else's reply, so buffer
            # it (for them to receive) and try again
            @recv_buf[msg.tag] = msg
            nil
          end
        end
      end
    end

    if reply
      @tag_pool.release tag

      if reply.is_a? Rerror
        raise Error, reply.ename
      end

      return reply
    else
      # give other threads a chance to receive
      Thread.pass
    end
  end
end

#remove(path) ⇒ Object

Deletes the file at the given path.



403
404
405
406
# File 'lib/rumai/ixp/transport.rb', line 403

def remove path
  path_fid = walk(path)
  remove_fid path_fid # remove also does clunk
end

#remove_fid(path_fid) ⇒ Object

Deletes the file corresponding to the given FID and clunks the given FID.



412
413
414
# File 'lib/rumai/ixp/transport.rb', line 412

def remove_fid path_fid
  talk Tremove.new(:fid => path_fid)
end

#send(request) ⇒ Object

Sends the given request Fcall and returns a ticket that you can use later to receive the reply.



107
108
109
110
111
112
113
114
# File 'lib/rumai/ixp/transport.rb', line 107

def send request
  tag = @tag_pool.obtain

  request.tag = tag
  @stream << request.to_9p

  tag
end

#stat(path) ⇒ Object

Returns information about the file at the given path.



419
420
421
422
423
424
# File 'lib/rumai/ixp/transport.rb', line 419

def stat path
  with_fid do |path_fid|
    walk_fid path_fid, path
    stat_fid path_fid
  end
end

#stat_fid(path_fid) ⇒ Object

Returns information about the file referenced by the given FID.



429
430
431
432
433
# File 'lib/rumai/ixp/transport.rb', line 429

def stat_fid path_fid
  req = Tstat.new(:fid => path_fid)
  rsp = talk(req)
  rsp.stat
end

#talk(request) ⇒ Object

Sends the given request Fcall and returns its reply.



172
173
174
175
176
177
178
179
180
181
# File 'lib/rumai/ixp/transport.rb', line 172

def talk request
  tag = send(request)

  begin
    recv tag
  rescue Error => e
    e.message << " -- in reply to #{request.inspect}"
    raise
  end
end

#walk(path) ⇒ Object

Returns an FID corresponding to the given path.



438
439
440
441
442
# File 'lib/rumai/ixp/transport.rb', line 438

def walk path
  fid = @fid_pool.obtain
  walk_fid fid, path
  fid
end

#walk_fid(path_fid, path) ⇒ Object

Associates the given FID to the given path.



447
448
449
450
451
452
453
# File 'lib/rumai/ixp/transport.rb', line 447

def walk_fid path_fid, path
  talk Twalk.new(
    :fid    => @root_fid,
    :newfid => path_fid,
    :wname  => path.to_s.split(%r{/+}).reject {|s| s.empty? }
  )
end

#write(path, content) ⇒ Object

Writes the given content to the file at the given path.



375
376
377
# File 'lib/rumai/ixp/transport.rb', line 375

def write path, content
  open(path, 'w') {|f| f.write content }
end