Class: Rinda::TupleSpace

Inherits:
Object
  • Object
show all
Includes:
DRbUndumped, MonitorMixin
Defined in:
lib/rinda/tuplespace.rb

Overview

The Tuplespace manages access to the tuples it contains, ensuring mutual exclusion requirements are met.

The sec option for the write, take, move, read and notify methods may either be a number of seconds or a Renewer object.

Instance Method Summary collapse

Constructor Details

#initialize(period = 60) ⇒ TupleSpace

Creates a new TupleSpace. period is used to control how often to look for dead tuples after modifications to the TupleSpace.

If no dead tuples are found period seconds after the last modification, the TupleSpace will stop looking for dead tuples.



438
439
440
441
442
443
444
445
446
# File 'lib/rinda/tuplespace.rb', line 438

def initialize(period=60)
  super()
  @bag = TupleBag.new
  @read_waiter = TupleBag.new
  @take_waiter = TupleBag.new
  @notify_waiter = TupleBag.new
  @period = period
  @keeper = nil
end

Instance Method Details

#move(port, tuple, sec = nil) {|template| ... } ⇒ Object

Moves tuple to port.

Yields:

  • (template)


485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
# File 'lib/rinda/tuplespace.rb', line 485

def move(port, tuple, sec=nil)
  template = WaitTemplateEntry.new(self, tuple, sec)
  yield(template) if block_given?
  synchronize do
    entry = @bag.find(template)
    if entry
      port.push(entry.value) if port
      @bag.delete(entry)
      notify_event('take', entry.value)
      return port ? nil : entry.value
    end
    raise RequestExpiredError if template.expired?

    begin
      @take_waiter.push(template)
      start_keeper if template.expires
      while true
        raise RequestCanceledError if template.canceled?
        raise RequestExpiredError if template.expired?
        entry = @bag.find(template)
        if entry
          port.push(entry.value) if port
          @bag.delete(entry)
          notify_event('take', entry.value)
          return port ? nil : entry.value
        end
        template.wait
      end
    ensure
      @take_waiter.delete(template)
    end
  end
end

#notify(event, tuple, sec = nil) ⇒ Object

Registers for notifications of event. Returns a NotifyTemplateEntry. See NotifyTemplateEntry for examples of how to listen for notifications.

event can be:

‘write’

A tuple was added

‘take’

A tuple was taken or moved

‘delete’

A tuple was lost after being overwritten or expiring

The TupleSpace will also notify you of the ‘close’ event when the NotifyTemplateEntry has expired.



568
569
570
571
572
573
574
# File 'lib/rinda/tuplespace.rb', line 568

def notify(event, tuple, sec=nil)
  template = NotifyTemplateEntry.new(self, event, tuple, sec)
  synchronize do
    @notify_waiter.push(template)
  end
  template
end

#read(tuple, sec = nil) {|template| ... } ⇒ Object

Reads tuple, but does not remove it.

Yields:

  • (template)


522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
# File 'lib/rinda/tuplespace.rb', line 522

def read(tuple, sec=nil)
  template = WaitTemplateEntry.new(self, tuple, sec)
  yield(template) if block_given?
  synchronize do
    entry = @bag.find(template)
    return entry.value if entry
    raise RequestExpiredError if template.expired?

    begin
      @read_waiter.push(template)
      start_keeper if template.expires
      template.wait
      raise RequestCanceledError if template.canceled?
      raise RequestExpiredError if template.expired?
      return template.found
    ensure
      @read_waiter.delete(template)
    end
  end
end

#read_all(tuple) ⇒ Object

Returns all tuples matching tuple. Does not remove the found tuples.



546
547
548
549
550
551
552
553
554
# File 'lib/rinda/tuplespace.rb', line 546

def read_all(tuple)
  template = WaitTemplateEntry.new(self, tuple, nil)
  synchronize do
    entry = @bag.find_all(template)
    entry.collect do |e|
      e.value
    end
  end
end

#take(tuple, sec = nil, &block) ⇒ Object

Removes tuple



478
479
480
# File 'lib/rinda/tuplespace.rb', line 478

def take(tuple, sec=nil, &block)
  move(nil, tuple, sec, &block)
end

#write(tuple, sec = nil) ⇒ Object

Adds tuple



451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
# File 'lib/rinda/tuplespace.rb', line 451

def write(tuple, sec=nil)
  entry = create_entry(tuple, sec)
  synchronize do
    if entry.expired?
      @read_waiter.find_all_template(entry).each do |template|
        template.read(tuple)
      end
      notify_event('write', entry.value)
      notify_event('delete', entry.value)
    else
      @bag.push(entry)
      start_keeper if entry.expires
      @read_waiter.find_all_template(entry).each do |template|
        template.read(tuple)
      end
      @take_waiter.find_all_template(entry).each do |template|
        template.signal
      end
      notify_event('write', entry.value)
    end
  end
  entry
end