Class: RbbtProcessQueue::RbbtProcessSocket
- Inherits:
-
Object
- Object
- RbbtProcessQueue::RbbtProcessSocket
- Defined in:
- lib/rbbt/util/concurrency/processes/socket.rb
Constant Summary collapse
- Serializer =
Marshal
Instance Attribute Summary collapse
-
#read_sem ⇒ Object
Returns the value of attribute read_sem.
-
#sread ⇒ Object
Returns the value of attribute sread.
-
#swrite ⇒ Object
Returns the value of attribute swrite.
-
#write_sem ⇒ Object
Returns the value of attribute write_sem.
Instance Method Summary collapse
- #clean ⇒ Object
- #close_read ⇒ Object
- #close_write ⇒ Object
- #closed_read? ⇒ Boolean
- #closed_write? ⇒ Boolean
- #dump(obj, stream) ⇒ Object
-
#initialize ⇒ RbbtProcessSocket
constructor
A new instance of RbbtProcessSocket.
- #load(stream) ⇒ Object
- #pop ⇒ Object
- #push(obj) ⇒ Object
Constructor Details
#initialize ⇒ RbbtProcessSocket
Returns a new instance of RbbtProcessSocket.
9 10 11 12 13 14 15 16 17 |
# File 'lib/rbbt/util/concurrency/processes/socket.rb', line 9 def initialize @sread, @swrite = Misc.pipe key = rand(100000000).to_s; @write_sem = key + '.in' @read_sem = key + '.out' RbbtSemaphore.create_semaphore(@write_sem,1) RbbtSemaphore.create_semaphore(@read_sem,1) end |
Instance Attribute Details
#read_sem ⇒ Object
Returns the value of attribute read_sem.
8 9 10 |
# File 'lib/rbbt/util/concurrency/processes/socket.rb', line 8 def read_sem @read_sem end |
#sread ⇒ Object
Returns the value of attribute sread.
8 9 10 |
# File 'lib/rbbt/util/concurrency/processes/socket.rb', line 8 def sread @sread end |
#swrite ⇒ Object
Returns the value of attribute swrite.
8 9 10 |
# File 'lib/rbbt/util/concurrency/processes/socket.rb', line 8 def swrite @swrite end |
#write_sem ⇒ Object
Returns the value of attribute write_sem.
8 9 10 |
# File 'lib/rbbt/util/concurrency/processes/socket.rb', line 8 def write_sem @write_sem end |
Instance Method Details
#clean ⇒ Object
19 20 21 22 23 24 |
# File 'lib/rbbt/util/concurrency/processes/socket.rb', line 19 def clean @sread.close unless @sread.closed? @swrite.close unless @swrite.closed? RbbtSemaphore.delete_semaphore(@write_sem) RbbtSemaphore.delete_semaphore(@read_sem) end |
#close_read ⇒ Object
77 78 79 |
# File 'lib/rbbt/util/concurrency/processes/socket.rb', line 77 def close_read @sread.close end |
#close_write ⇒ Object
73 74 75 |
# File 'lib/rbbt/util/concurrency/processes/socket.rb', line 73 def close_write @swrite.close end |
#closed_read? ⇒ Boolean
65 66 67 |
# File 'lib/rbbt/util/concurrency/processes/socket.rb', line 65 def closed_read? @sread.closed? end |
#closed_write? ⇒ Boolean
69 70 71 |
# File 'lib/rbbt/util/concurrency/processes/socket.rb', line 69 def closed_write? @swrite.closed? end |
#dump(obj, stream) ⇒ Object
27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 |
# File 'lib/rbbt/util/concurrency/processes/socket.rb', line 27 def dump(obj, stream) case obj when String payload = obj size_head = [payload.bytesize,"S"].pack 'La' str = size_head << payload else payload = Serializer.dump(obj) size_head = [payload.bytesize,"M"].pack 'La' str = size_head << payload end write_length = str.length #IO.select(nil, [stream]) wrote = stream.write(str) while wrote < write_length wrote += stream.write(str[wrote..-1]) end end |
#load(stream) ⇒ Object
47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 |
# File 'lib/rbbt/util/concurrency/processes/socket.rb', line 47 def load(stream) size_head = Misc.read_stream stream, 5 size, type = size_head.unpack('La') begin payload = Misc.read_stream stream, size case type when "M" Serializer.load(payload) when "S" payload end rescue TryAgain retry end end |
#pop ⇒ Object
89 90 91 92 93 |
# File 'lib/rbbt/util/concurrency/processes/socket.rb', line 89 def pop RbbtSemaphore.synchronize(@read_sem) do self.load(@sread) end end |
#push(obj) ⇒ Object
83 84 85 86 87 |
# File 'lib/rbbt/util/concurrency/processes/socket.rb', line 83 def push(obj) RbbtSemaphore.synchronize(@write_sem) do self.dump(obj, @swrite) end end |