Class: Rinda::TupleSpace

Inherits:
Object
  • Object
show all
Includes:
DRbUndumped, MonitorMixin
Defined in:
lib/rinda/tuplespace.rb

Overview

The Tuplespace manages access to the tuples it contains, ensuring mutual exclusion requirements are met.

The sec option for the write, take, move, read and notify methods may either be a number of seconds or a Renewer object.

Instance Method Summary collapse

Constructor Details

#initialize(period = 60) ⇒ TupleSpace

Creates a new TupleSpace. period is used to control how often to look for dead tuples after modifications to the TupleSpace.

If no dead tuples are found period seconds after the last modification, the TupleSpace will stop looking for dead tuples.



392
393
394
395
396
397
398
399
400
# File 'lib/rinda/tuplespace.rb', line 392

def initialize(period=60)
  super()
  @bag = TupleBag.new
  @read_waiter = TupleBag.new
  @take_waiter = TupleBag.new
  @notify_waiter = TupleBag.new
  @period = period
  @keeper = nil
end

Instance Method Details

#move(port, tuple, sec = nil) {|template| ... } ⇒ Object

Moves tuple to port.

Yields:

  • (template)


439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
# File 'lib/rinda/tuplespace.rb', line 439

def move(port, tuple, sec=nil)
  template = WaitTemplateEntry.new(self, tuple, sec)
  yield(template) if block_given?
  synchronize do
    entry = @bag.find(template)
    if entry
      port.push(entry.value) if port
      @bag.delete(entry)
      notify_event('take', entry.value)
      return entry.value
    end
    raise RequestExpiredError if template.expired?

    begin
      @take_waiter.push(template)
      start_keeper if template.expires
      while true
        raise RequestCanceledError if template.canceled?
        raise RequestExpiredError if template.expired?
        entry = @bag.find(template)
        if entry
          port.push(entry.value) if port
          @bag.delete(entry)
          notify_event('take', entry.value)
          return entry.value
        end
        template.wait
      end
    ensure
      @take_waiter.delete(template)
    end
  end
end

#notify(event, tuple, sec = nil) ⇒ Object

Registers for notifications of event. Returns a NotifyTemplateEntry. See NotifyTemplateEntry for examples of how to listen for notifications.

event can be:

'write'

A tuple was added

'take'

A tuple was taken or moved

'delete'

A tuple was lost after being overwritten or expiring

The TupleSpace will also notify you of the 'close' event when the NotifyTemplateEntry has expired.



522
523
524
525
526
527
528
# File 'lib/rinda/tuplespace.rb', line 522

def notify(event, tuple, sec=nil)
  template = NotifyTemplateEntry.new(self, event, tuple, sec)
  synchronize do
    @notify_waiter.push(template)
  end
  template
end

#read(tuple, sec = nil) {|template| ... } ⇒ Object

Reads tuple, but does not remove it.

Yields:

  • (template)


476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
# File 'lib/rinda/tuplespace.rb', line 476

def read(tuple, sec=nil)
  template = WaitTemplateEntry.new(self, tuple, sec)
  yield(template) if block_given?
  synchronize do
    entry = @bag.find(template)
    return entry.value if entry
    raise RequestExpiredError if template.expired?

    begin
      @read_waiter.push(template)
      start_keeper if template.expires
      template.wait
      raise RequestCanceledError if template.canceled?
      raise RequestExpiredError if template.expired?
      return template.found
    ensure
      @read_waiter.delete(template)
    end
  end
end

#read_all(tuple) ⇒ Object

Returns all tuples matching tuple. Does not remove the found tuples.



500
501
502
503
504
505
506
507
508
# File 'lib/rinda/tuplespace.rb', line 500

def read_all(tuple)
  template = WaitTemplateEntry.new(self, tuple, nil)
  synchronize do
    entry = @bag.find_all(template)
    entry.collect do |e|
      e.value
    end
  end
end

#take(tuple, sec = nil, &block) ⇒ Object

Removes tuple



432
433
434
# File 'lib/rinda/tuplespace.rb', line 432

def take(tuple, sec=nil, &block)
  move(nil, tuple, sec, &block)
end

#write(tuple, sec = nil) ⇒ Object

Adds tuple



405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
# File 'lib/rinda/tuplespace.rb', line 405

def write(tuple, sec=nil)
  entry = TupleEntry.new(tuple, sec)
  synchronize do
    if entry.expired?
      @read_waiter.find_all_template(entry).each do |template|
        template.read(tuple)
      end
      notify_event('write', entry.value)
      notify_event('delete', entry.value)
    else
      @bag.push(entry)
      start_keeper if entry.expires
      @read_waiter.find_all_template(entry).each do |template|
        template.read(tuple)
      end
      @take_waiter.find_all_template(entry).each do |template|
        template.signal
      end
      notify_event('write', entry.value)
    end
  end
  entry
end