Class: ProcessShared::Posix::Semaphore

Inherits:
Object
  • Object
show all
Extended by:
OpenWithSelf
Includes:
Foreign, SynchronizableSemaphore
Defined in:
lib/process_shared/posix/semaphore.rb

Defined Under Namespace

Modules: Foreign

Constant Summary

Constants included from Foreign

Foreign::BLOCKING_SEM_TIMEDWAIT

Class Method Summary collapse

Instance Method Summary collapse

Methods included from OpenWithSelf

open

Methods included from SynchronizableSemaphore

#synchronize, #to_mtx

Methods included from Errno

#error_check

Constructor Details

#initialize(value = 1) ⇒ Semaphore

Create a new semaphore with initial value value. After Kernel#fork, the semaphore will be shared across two (or more) processes. The semaphore must be closed with #close in each process that no longer needs the semaphore.

(An object finalizer is registered that will close the semaphore to avoid memory leaks, but this should be considered a last resort).

Parameters:

  • value (Integer) (defaults to: 1)

    the initial semaphore value



63
64
65
66
67
# File 'lib/process_shared/posix/semaphore.rb', line 63

def initialize(value = 1)
  @sem = SharedMemory.new(LibC.type_size(:sem_t))
  sem_init(@sem, 1, value)
  ObjectSpace.define_finalizer(self, self.class.make_finalizer(@sem))
end

Class Method Details

.make_finalizer(sem) ⇒ Proc

Make a Proc suitable for use as a finalizer that will call shm_unlink on sem.

Returns:

  • (Proc)

    a finalizer



49
50
51
# File 'lib/process_shared/posix/semaphore.rb', line 49

def self.make_finalizer(sem)
  proc { LibC.shm_unlink(sem) }
end

Instance Method Details

#closeObject

Close the shared memory block holding the semaphore.

FIXME: May leak the semaphore memory on some platforms, according to the Linux man page for sem_destroy(3). (Should not be destroyed as it may be in use by other processes.)



118
119
120
121
122
123
124
125
126
127
# File 'lib/process_shared/posix/semaphore.rb', line 118

def close
  # sem_destroy(@sem)

  # Not entirely sure what to do here.  sem_destroy() goes with
  # sem_init() (unnamed semaphroe), but other processes cannot use
  # a destroyed semaphore.
  @sem.close
  @sem = nil
  ObjectSpace.undefine_finalizer(self)
end

#postObject

Increment the value of the semaphore. If other processes are waiting on this semaphore, one will be woken.



81
82
83
# File 'lib/process_shared/posix/semaphore.rb', line 81

def post
  sem_post(@sem)
end

#try_wait(timeout = nil) ⇒ Object

Decrement the value of the semaphore if it can be done immediately (i.e. if it was non-zero). Otherwise, wait up to timeout seconds until another process increments via #post.

decremented immediately, raise Errno::EAGAIN. If timeout passed before the semaphore could be decremented, raise Errno::ETIMEDOUT.

Parameters:

  • timeout (Numeric) (defaults to: nil)

    the maximum seconds to wait, or nil to not wait

Returns:

  • If timeout is nil and the semaphore cannot be



101
102
103
104
105
106
107
108
109
110
111
# File 'lib/process_shared/posix/semaphore.rb', line 101

def try_wait(timeout = nil)
  if timeout
    now = TimeVal.new
    LibC.gettimeofday(now, nil)
    abs_timeout = now.to_time_spec
    abs_timeout.add_seconds!(timeout)
    sem_timedwait(@sem, abs_timeout)
  else
    sem_trywait(@sem)
  end
end

#valueInteger

Get the current value of the semaphore. Raises Errno::NOTSUP on platforms that don’t support this (e.g. Mac OS X).

Returns:

  • (Integer)

    the current value of the semaphore.



73
74
75
76
77
# File 'lib/process_shared/posix/semaphore.rb', line 73

def value
  int = FFI::MemoryPointer.new(:int)
  sem_getvalue(@sem, int)
  int.read_int
end

#waitObject

Decrement the value of the semaphore. If the value is zero, wait until another process increments via #post.



87
88
89
# File 'lib/process_shared/posix/semaphore.rb', line 87

def wait
  sem_wait(@sem)
end