Class: RactorPool::Channel
- Inherits:
-
Object
- Object
- RactorPool::Channel
- Defined in:
- lib/ractor_pool/channel.rb
Instance Method Summary collapse
- #<<(data) ⇒ Object
- #close! ⇒ Object
-
#initialize ⇒ Channel
constructor
rubocop:disable Metrics/MethodLength.
- #subscribe ⇒ Object
Constructor Details
#initialize ⇒ Channel
rubocop:disable Metrics/MethodLength
4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
# File 'lib/ractor_pool/channel.rb', line 4 def initialize # rubocop:disable Metrics/MethodLength @pipe = Ractor.new do listeners_count = 0 loop do msg = Ractor.recv case msg in type: :close listeners_count.times { Ractor.yield(msg) } break in type: :subscription listeners_count += 1 Ractor.yield({ type: :skip }) in type: :data Ractor.yield({ type: :data, data: msg[:data] }) else end end end end |
Instance Method Details
#<<(data) ⇒ Object
24 25 26 |
# File 'lib/ractor_pool/channel.rb', line 24 def <<(data) @pipe.send({ type: :data, data: data }) end |
#close! ⇒ Object
28 29 30 |
# File 'lib/ractor_pool/channel.rb', line 28 def close! @pipe.send({ type: :close }, move: true) end |
#subscribe ⇒ Object
32 33 34 35 36 37 38 39 40 41 42 43 44 45 |
# File 'lib/ractor_pool/channel.rb', line 32 def subscribe @pipe.send({ type: :subscription }, move: true) loop do msg = @pipe.take case msg in type: :close return in type: :data yield(msg[:data]) else end end rescue Ractor::ClosedError # rubocop:disable Lint/SuppressedException end |