Class: RbbtProcessQueue::RbbtProcessSocket

Inherits:
Object
  • Object
show all
Defined in:
lib/rbbt/util/concurrency/processes/socket.rb

Constant Summary collapse

Serializer =
Marshal

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeRbbtProcessSocket

Returns a new instance of RbbtProcessSocket.



9
10
11
12
13
14
15
16
17
# File 'lib/rbbt/util/concurrency/processes/socket.rb', line 9

def initialize
  @sread, @swrite = Misc.pipe

  key = rand(100000000).to_s;
  @write_sem = key + '.in'
  @read_sem = key + '.out'
  RbbtSemaphore.create_semaphore(@write_sem,1)
  RbbtSemaphore.create_semaphore(@read_sem,1)
end

Instance Attribute Details

#read_semObject

Returns the value of attribute read_sem.



8
9
10
# File 'lib/rbbt/util/concurrency/processes/socket.rb', line 8

def read_sem
  @read_sem
end

#sreadObject

Returns the value of attribute sread.



8
9
10
# File 'lib/rbbt/util/concurrency/processes/socket.rb', line 8

def sread
  @sread
end

#swriteObject

Returns the value of attribute swrite.



8
9
10
# File 'lib/rbbt/util/concurrency/processes/socket.rb', line 8

def swrite
  @swrite
end

#write_semObject

Returns the value of attribute write_sem.



8
9
10
# File 'lib/rbbt/util/concurrency/processes/socket.rb', line 8

def write_sem
  @write_sem
end

Instance Method Details

#cleanObject



19
20
21
22
23
24
# File 'lib/rbbt/util/concurrency/processes/socket.rb', line 19

def clean
  @sread.close unless @sread.closed?
  @swrite.close unless @swrite.closed?
  RbbtSemaphore.delete_semaphore(@write_sem)
  RbbtSemaphore.delete_semaphore(@read_sem)
end

#close_readObject



77
78
79
# File 'lib/rbbt/util/concurrency/processes/socket.rb', line 77

def close_read
  @sread.close 
end

#close_writeObject



73
74
75
# File 'lib/rbbt/util/concurrency/processes/socket.rb', line 73

def close_write
  @swrite.close
end

#closed_read?Boolean

Returns:

  • (Boolean)


65
66
67
# File 'lib/rbbt/util/concurrency/processes/socket.rb', line 65

def closed_read?
  @sread.closed?
end

#closed_write?Boolean

Returns:

  • (Boolean)


69
70
71
# File 'lib/rbbt/util/concurrency/processes/socket.rb', line 69

def closed_write?
  @swrite.closed?
end

#dump(obj, stream) ⇒ Object



27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
# File 'lib/rbbt/util/concurrency/processes/socket.rb', line 27

def dump(obj, stream)
  case obj
  when String
    payload = obj
    size_head = [payload.bytesize,"S"].pack 'La'
    str = size_head << payload
  else
    payload = Serializer.dump(obj)
    size_head = [payload.bytesize,"M"].pack 'La'
    str = size_head << payload
  end

  write_length = str.length
  #IO.select(nil, [stream])
  wrote = stream.write(str) 
  while wrote < write_length
    wrote += stream.write(str[wrote..-1]) 
  end
end

#load(stream) ⇒ Object



47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
# File 'lib/rbbt/util/concurrency/processes/socket.rb', line 47

def load(stream)
  size_head = Misc.read_stream stream, 5

  size, type = size_head.unpack('La')

  begin
    payload = Misc.read_stream stream, size
    case type
    when "M"
      Serializer.load(payload)
    when "S"
      payload
    end
  rescue TryAgain
    retry
  end
end

#popObject



89
90
91
92
93
# File 'lib/rbbt/util/concurrency/processes/socket.rb', line 89

def pop
  RbbtSemaphore.synchronize(@read_sem) do
    self.load(@sread)
  end
end

#push(obj) ⇒ Object



83
84
85
86
87
# File 'lib/rbbt/util/concurrency/processes/socket.rb', line 83

def push(obj)
  RbbtSemaphore.synchronize(@write_sem) do
    self.dump(obj, @swrite)
  end
end