Class: Concurrent::Promises::Channel
- Inherits:
-
Synchronization::Object
- Object
- Synchronization::Object
- Concurrent::Promises::Channel
- Defined in:
- lib/concurrent/edge/promises.rb
Constant Summary collapse
- UNLIMITED =
Default size of the Channel, makes it accept unlimited number of messages.
Object.new
Instance Method Summary collapse
-
#initialize(size = UNLIMITED) ⇒ Channel
constructor
A channel to pass messages between promises.
-
#pop(probe = Concurrent::Promises.resolvable_future) ⇒ Future
Returns a future witch will become fulfilled with a value from the channel when one is available.
-
#push(message) ⇒ Future
Returns future which will fulfill when the message is added to the channel.
-
#to_s ⇒ String
(also: #inspect)
Short string representation.
Constructor Details
#initialize(size = UNLIMITED) ⇒ Channel
A channel to pass messages between promises. The size is limited to support back pressure.
1974 1975 1976 1977 1978 1979 1980 1981 1982 |
# File 'lib/concurrent/edge/promises.rb', line 1974 def initialize(size = UNLIMITED) super() @Size = size # TODO (pitr-ch 26-Dec-2016): replace with lock-free implementation @Mutex = Mutex.new @Probes = [] @Messages = [] @PendingPush = [] end |
Instance Method Details
#pop(probe = Concurrent::Promises.resolvable_future) ⇒ Future
Returns a future witch will become fulfilled with a value from the channel when one is available.
2013 2014 2015 2016 |
# File 'lib/concurrent/edge/promises.rb', line 2013 def pop(probe = Concurrent::Promises.resolvable_future) # TODO (pitr-ch 26-Dec-2016): improve performance pop_for_select(probe).then(&:last) end |
#push(message) ⇒ Future
Returns future which will fulfill when the message is added to the channel. Its value is the message.
1988 1989 1990 1991 1992 1993 1994 1995 1996 1997 1998 1999 2000 2001 2002 2003 2004 2005 2006 2007 2008 |
# File 'lib/concurrent/edge/promises.rb', line 1988 def push() @Mutex.synchronize do while true if @Probes.empty? if @Size > @Messages.size @Messages.push return Promises.fulfilled_future else pushed = Promises.resolvable_future @PendingPush.push [, pushed] return pushed.with_hidden_resolvable end else probe = @Probes.shift if probe.fulfill [self, ], false return Promises.fulfilled_future() end end end end end |
#to_s ⇒ String Also known as: inspect
Returns Short string representation.
2038 2039 2040 |
# File 'lib/concurrent/edge/promises.rb', line 2038 def to_s format '<#%s:0x%x size:%s>', self.class, object_id << 1, @Size end |