Class: Hx::Interop::Channel

Inherits:
Object
  • Object
show all
Defined in:
lib/interop/channel.rb

Overview

:nodoc:

Defined Under Namespace

Classes: Blocker, Get, Put

Constant Summary collapse

Deadlock =
Class.new(Error)
Closed =
Class.new(Error)

Instance Method Summary collapse

Constructor Details

#initialize(butter_limit = 0) ⇒ Channel

Returns a new instance of Channel.



55
56
57
58
59
60
61
# File 'lib/interop/channel.rb', line 55

def initialize(butter_limit = 0)
  @buffer_limit = butter_limit
  @buffer       = []
  @gets         = []
  @puts         = []
  @mutex        = Mutex.new
end

Instance Method Details

#<<(obj) ⇒ Object



68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
# File 'lib/interop/channel.rb', line 68

def <<(obj)
  put = nil
  @mutex.synchronize do
    raise Closed if @closed

    if (get = @gets.shift)
      get.resolve obj
    elsif @buffer.length < @buffer_limit
      @buffer << obj
    else
      raise Deadlock if Thread.list.one?

      put = Put.new(obj)
      @puts << put
    end
  end
  put&.wait

  self
end

#closeObject

Raises:



106
107
108
109
110
111
112
113
114
115
# File 'lib/interop/channel.rb', line 106

def close
  raise Closed if @closed

  @mutex.synchronize do
    @closed = true
    @puts.each &:resolve
    @gets.each { |g| g.resolve nil }
  end
  @buffer
end

#getObject



89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
# File 'lib/interop/channel.rb', line 89

def get
  get = nil
  @mutex.synchronize do
    return if @closed

    put = @puts.shift
    return put.resolve if put
    return @buffer.shift if @buffer.any?

    raise Deadlock if Thread.list.one?

    get = Get.new
    @gets << get
  end
  get.wait
end

#put(*objects) ⇒ Object



63
64
65
66
# File 'lib/interop/channel.rb', line 63

def put(*objects)
  objects.each &method(:<<)
  self
end