Class: RactorPool::Channel

Inherits:
Object
  • Object
show all
Defined in:
lib/ractor_pool/channel.rb

Instance Method Summary collapse

Constructor Details

#initializeChannel

rubocop:disable Metrics/MethodLength



4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# File 'lib/ractor_pool/channel.rb', line 4

def initialize # rubocop:disable Metrics/MethodLength
  @pipe = Ractor.new do
    listeners_count = 0
    loop do
      msg = Ractor.recv
      case msg
      in type: :close
        listeners_count.times { Ractor.yield(msg) }
        break
      in type: :subscription
        listeners_count += 1
        Ractor.yield({ type: :skip })
      in type: :data
        Ractor.yield({ type: :data, data: msg[:data] })
      else
      end
    end
  end
end

Instance Method Details

#<<(data) ⇒ Object



24
25
26
# File 'lib/ractor_pool/channel.rb', line 24

def <<(data)
  @pipe.send({ type: :data, data: data })
end

#close!Object



28
29
30
# File 'lib/ractor_pool/channel.rb', line 28

def close!
  @pipe.send({ type: :close }, move: true)
end

#subscribeObject



32
33
34
35
36
37
38
39
40
41
42
43
44
45
# File 'lib/ractor_pool/channel.rb', line 32

def subscribe
  @pipe.send({ type: :subscription }, move: true)
  loop do
    msg = @pipe.take
    case msg
      in type: :close
      return
      in type: :data
      yield(msg[:data])
      else
    end
  end
rescue Ractor::ClosedError # rubocop:disable Lint/SuppressedException
end