Class: RbbtProcessQueue::RbbtProcessSocket

Inherits:
Object
  • Object
show all
Defined in:
lib/rbbt/util/concurrency/processes/socket.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(serializer = nil) ⇒ RbbtProcessSocket

Returns a new instance of RbbtProcessSocket.



7
8
9
10
11
12
13
14
15
16
17
18
19
# File 'lib/rbbt/util/concurrency/processes/socket.rb', line 7

def initialize(serializer = nil)
  @sread, @swrite = Misc.pipe

  @serializer = serializer || Marshal
               

  @key = "/" << rand(1000000000).to_s << '.' << Process.pid.to_s;
  @write_sem = @key + '.in'
  @read_sem = @key + '.out'
  Log.medium "Creating socket semaphores: #{@key}"
  RbbtSemaphore.create_semaphore(@write_sem,1)
  RbbtSemaphore.create_semaphore(@read_sem,1)
end

Instance Attribute Details

#read_semObject

Returns the value of attribute read_sem.



6
7
8
# File 'lib/rbbt/util/concurrency/processes/socket.rb', line 6

def read_sem
  @read_sem
end

#sreadObject

Returns the value of attribute sread.



6
7
8
# File 'lib/rbbt/util/concurrency/processes/socket.rb', line 6

def sread
  @sread
end

#swriteObject

Returns the value of attribute swrite.



6
7
8
# File 'lib/rbbt/util/concurrency/processes/socket.rb', line 6

def swrite
  @swrite
end

#write_semObject

Returns the value of attribute write_sem.



6
7
8
# File 'lib/rbbt/util/concurrency/processes/socket.rb', line 6

def write_sem
  @write_sem
end

Instance Method Details

#cleanObject



21
22
23
24
25
26
27
# File 'lib/rbbt/util/concurrency/processes/socket.rb', line 21

def clean
  @sread.close unless @sread.closed?
  @swrite.close unless @swrite.closed?
  Log.medium "Destroying socket semaphores: #{[@key] * ", "}"
  RbbtSemaphore.delete_semaphore(@write_sem)
  RbbtSemaphore.delete_semaphore(@read_sem)
end

#close_readObject



79
80
81
# File 'lib/rbbt/util/concurrency/processes/socket.rb', line 79

def close_read
  @sread.close 
end

#close_writeObject



75
76
77
# File 'lib/rbbt/util/concurrency/processes/socket.rb', line 75

def close_write
  @swrite.close
end

#closed_read?Boolean

Returns:

  • (Boolean)


67
68
69
# File 'lib/rbbt/util/concurrency/processes/socket.rb', line 67

def closed_read?
  @sread.closed?
end

#closed_write?Boolean

Returns:

  • (Boolean)


71
72
73
# File 'lib/rbbt/util/concurrency/processes/socket.rb', line 71

def closed_write?
  @swrite.closed?
end

#dump(obj, stream) ⇒ Object



30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# File 'lib/rbbt/util/concurrency/processes/socket.rb', line 30

def dump(obj, stream)
  case obj
  when String
    payload = obj
    size_head = [payload.bytesize,"C"].pack 'La'
    str = size_head << payload
  else
    payload = @serializer.dump(obj)
    size_head = [payload.bytesize,"S"].pack 'La'
    str = size_head << payload
  end

  write_length = str.length
  wrote = stream.write(str) 
  while wrote < write_length
    wrote += stream.write(str[wrote..-1]) 
  end
end

#load(stream) ⇒ Object



49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
# File 'lib/rbbt/util/concurrency/processes/socket.rb', line 49

def load(stream)
  size_head = Misc.read_stream stream, 5

  size, type = size_head.unpack('La')

  begin
    payload = Misc.read_stream stream, size
    case type
    when "S"
      @serializer.load(payload)
    when "C"
      payload
    end
  rescue TryAgain
    retry
  end
end

#popObject



91
92
93
94
95
# File 'lib/rbbt/util/concurrency/processes/socket.rb', line 91

def pop
  RbbtSemaphore.synchronize(@read_sem) do
    self.load(@sread)
  end
end

#push(obj) ⇒ Object



85
86
87
88
89
# File 'lib/rbbt/util/concurrency/processes/socket.rb', line 85

def push(obj)
  RbbtSemaphore.synchronize(@write_sem) do
    self.dump(obj, @swrite)
  end
end