Class: MultiProcessing::Mutex

Inherits:
Object
  • Object
show all
Defined in:
lib/multiprocessing/mutex.rb

Overview

Process version of Mutex. This can be used like ::Mutex in Ruby standard library.

Do not fork in #synchronize block or before #unlock. Forking and forked process run in parallel.

Note that Mutex uses 1 pipe.

Examples:

require 'multiprocessing'

mutex = MultiProcessing::Mutex.new
3.times do
  fork do
    mutex.synchronize do
      # critical section
      puts Process.pid
      sleep 1
    end
  end
end
Process.waitall
# => prints 3 pids of forked process in 1 sec interval

Instance Method Summary collapse

Constructor Details

#initializeMutex

Returns a new instance of Mutex.



34
35
36
37
# File 'lib/multiprocessing/mutex.rb', line 34

def initialize
  @pout,@pin = IO.pipe
  @pin.syswrite 1
end

Instance Method Details

#lockMutex

Attempts to grab the lock and waits if it isn’t available. Raises ProcessError if mutex was locked by the current thread.

Returns:

Raises:



47
48
49
50
51
52
53
54
55
# File 'lib/multiprocessing/mutex.rb', line 47

def lock
  MultiProcessing.try_handle_interrupt(RuntimeError => :on_blocking) do
    raise ProcessError.new "mutex was tried locking twice" if owned?
    @pout.readpartial 1
    @locking_pid = Process.pid
    @locking_thread = Thread.current
    self
  end
end

#locked?Boolean

Returns true if this lock is currently held by some thread.

Returns:

  • (Boolean)


63
64
65
66
67
68
69
70
71
72
73
# File 'lib/multiprocessing/mutex.rb', line 63

def locked?
  MultiProcessing.try_handle_interrupt(RuntimeError => :never) do
    begin
      @pout.read_nonblock 1
      @pin.syswrite 1
      false
    rescue Errno::EAGAIN => e
      true
    end
  end
end

#owned?Boolean

Returns true if the lock is locked by current thread on current process

Returns:

  • (Boolean)


101
102
103
# File 'lib/multiprocessing/mutex.rb', line 101

def owned?
  @locking_pid == Process.pid && @locking_thread == Thread.current
end

#sleep(timeout = nil) ⇒ Object

Releases the lock and sleeps timeout seconds if it is given and non-nil or forever. Raises ProcessError if mutex wasn’t locked by the current thread.

Parameters:

  • timeout (Numeric, nil) (defaults to: nil)

Raises:



155
156
157
158
159
160
161
162
163
164
# File 'lib/multiprocessing/mutex.rb', line 155

def sleep timeout=nil
  MultiProcessing.try_handle_interrupt(RuntimeError => :on_blocking) do
    unlock
    begin
      timeout ? Kernel.sleep(timeout) : Kernel.sleep
    ensure
      lock
    end
  end
end

#synchronizeObject

Obtains a lock, runs the block, and releases the lock when the block completes.

Returns:

  • (Object)

    returned value of block



132
133
134
135
136
137
138
139
140
141
142
143
144
145
# File 'lib/multiprocessing/mutex.rb', line 132

def synchronize
  MultiProcessing.try_handle_interrupt(RuntimeError => :on_blocking) do
    lock
    ret = nil
    begin
      MultiProcessing.try_handle_interrupt(RuntimeError => :immediate) do
        ret = yield
      end
    ensure
      unlock
    end
    ret
  end
end

#try_lockBoolean

Attempts to obtain the lock and returns immediately. Returns true if the lock was granted.

Returns:

  • (Boolean)


82
83
84
85
86
87
88
89
90
91
92
93
# File 'lib/multiprocessing/mutex.rb', line 82

def try_lock
  MultiProcessing.try_handle_interrupt(RuntimeError => :never) do
    begin
      @pout.read_nonblock 1
      @locking_thread = Thread.current
      @locking_pid = Process.pid
      return true
    rescue Errno::EAGAIN
      return false
    end
  end
end

#unlockMutex

Note:

An order of restarting thread is indefinite.

Releases the lock. Raises ProcessError if mutex wasn’t locked by the current thread.

Returns:

Raises:



115
116
117
118
119
120
121
122
123
124
# File 'lib/multiprocessing/mutex.rb', line 115

def unlock
  MultiProcessing.try_handle_interrupt(RuntimeError => :never) do
    raise ProcessError.new("Attempt to unlock a mutex which is not locked") unless locked?
    raise ProcessError.new("Mutex was tried being unlocked in process/thread which didn't lock this mutex: locking[pid:#{(@locking_pid||'nil')}, thread:#{@locking_thread.inspect}] current[pid:#{Process.pid}, thread:#{Thread.current.inspect}]") unless owned?
    @locking_pid = nil
    @locking_thread = nil
    @pin.syswrite 1
    self
  end
end