Class: Concurrent::Promises::Channel

Inherits:
Synchronization::Object
  • Object
show all
Defined in:
lib/concurrent/edge/promises.rb

Constant Summary collapse

UNLIMITED =

Default size of the Channel, makes it accept unlimited number of messages.

Object.new

Instance Method Summary collapse

Constructor Details

#initialize(size = UNLIMITED) ⇒ Channel

A channel to pass messages between promises. The size is limited to support back pressure.

Parameters:

  • size (Integer, UNLIMITED) (defaults to: UNLIMITED)

    the maximum number of messages stored in the channel.



1974
1975
1976
1977
1978
1979
1980
1981
1982
# File 'lib/concurrent/edge/promises.rb', line 1974

def initialize(size = UNLIMITED)
  super()
  @Size        = size
  # TODO (pitr-ch 26-Dec-2016): replace with lock-free implementation
  @Mutex       = Mutex.new
  @Probes      = []
  @Messages    = []
  @PendingPush = []
end

Instance Method Details

#pop(probe = Concurrent::Promises.resolvable_future) ⇒ Future

Returns a future witch will become fulfilled with a value from the channel when one is available.

Parameters:

  • probe (ResolvableFuture) (defaults to: Concurrent::Promises.resolvable_future)

    the future which will be fulfilled with a channel value

Returns:

  • (Future)

    the probe, its value will be the message when available.



2013
2014
2015
2016
# File 'lib/concurrent/edge/promises.rb', line 2013

def pop(probe = Concurrent::Promises.resolvable_future)
  # TODO (pitr-ch 26-Dec-2016): improve performance
  pop_for_select(probe).then(&:last)
end

#push(message) ⇒ Future

Returns future which will fulfill when the message is added to the channel. Its value is the message.

Parameters:

  • message (Object)

Returns:



1988
1989
1990
1991
1992
1993
1994
1995
1996
1997
1998
1999
2000
2001
2002
2003
2004
2005
2006
2007
2008
# File 'lib/concurrent/edge/promises.rb', line 1988

def push(message)
  @Mutex.synchronize do
    while true
      if @Probes.empty?
        if @Size > @Messages.size
          @Messages.push message
          return Promises.fulfilled_future message
        else
          pushed = Promises.resolvable_future
          @PendingPush.push [message, pushed]
          return pushed.with_hidden_resolvable
        end
      else
        probe = @Probes.shift
        if probe.fulfill [self, message], false
          return Promises.fulfilled_future(message)
        end
      end
    end
  end
end

#to_sString Also known as: inspect

Returns Short string representation.

Returns:

  • (String)

    Short string representation.



2038
2039
2040
# File 'lib/concurrent/edge/promises.rb', line 2038

def to_s
  format '<#%s:0x%x size:%s>', self.class, object_id << 1, @Size
end