Class: Concurrent::MVar

Inherits:
Object
  • Object
show all
Includes:
Dereferenceable
Defined in:
lib/concurrent/mvar.rb

Overview

An ‘MVar` is a synchronized single element container. They are empty or contain one item. Taking a value from an empty `MVar` blocks, as does putting a value into a full one. You can either think of them as blocking queue of length one, or a special kind of mutable variable.

On top of the fundamental ‘#put` and `#take` operations, we also provide a `#mutate` that is atomic with respect to operations on the same instance. These operations all support timeouts.

We also support non-blocking operations ‘#try_put!` and `#try_take!`, a `#set!` that ignores existing values, a `#value` that returns the value without removing it or returns `MVar::EMPTY`, and a `#modify!` that yields `MVar::EMPTY` if the `MVar` is empty and can be used to set `MVar::EMPTY`. You shouldn’t use these operations in the first instance.

‘MVar` is a [Dereferenceable](Dereferenceable).

‘MVar` is related to M-structures in Id, `MVar` in Haskell and `SyncVar` in Scala.

Note that unlike the original Haskell paper, our ‘#take` is blocking. This is how Haskell and Scala do it today.

**See Also:**

    1. Barth, R. Nikhil, and Arvind. [M-Structures: Extending a parallel, non-

strict, functional language with state](dl.acm.org/citation.cfm?id=652538). In Proceedings of the 5th ACM Conference on Functional Programming Languages and Computer Architecture (FPCA), 1991.

    1. Peyton Jones, A. Gordon, and S. Finne. [Concurrent Haskell](dl.acm.org/citation.cfm?id=237794). In Proceedings of the 23rd Symposium on Principles of Programming Languages (PoPL), 1996.

Constant Summary collapse

EMPTY =

Unique value that represents that an ‘MVar` was empty

Object.new
TIMEOUT =

Unique value that represents that an ‘MVar` timed out before it was able to produce a value.

Object.new

Instance Method Summary collapse

Methods included from Dereferenceable

#value

Constructor Details

#initialize(value = EMPTY, opts = {}) ⇒ MVar

Create a new ‘MVar`, either empty or with an initial value.

Parameters:

  • opts (Hash) (defaults to: {})

    the options controlling how the future will be processed

Options Hash (opts):

  • :operation (Boolean) — default: false

    when ‘true` will execute the future on the global operation pool (for long-running operations), when `false` will execute the future on the global task pool (for short-running tasks)

  • :executor (object)

    when provided will run all operations on this executor rather than the global thread pool (overrides :operation)

  • :dup_on_deref (String) — default: false

    call ‘#dup` before returning the data

  • :freeze_on_deref (String) — default: false

    call ‘#freeze` before returning the data

  • :copy_on_deref (String) — default: nil

    call the given ‘Proc` passing the internal value and returning the value returned from the proc



57
58
59
60
61
62
63
# File 'lib/concurrent/mvar.rb', line 57

def initialize(value = EMPTY, opts = {})
  @value = value
  @mutex = Mutex.new
  @empty_condition = Condition.new
  @full_condition = Condition.new
  set_deref_options(opts)
end

Instance Method Details

#empty?Boolean

Returns if the ‘MVar` is currently empty.

Returns:

  • (Boolean)


181
182
183
# File 'lib/concurrent/mvar.rb', line 181

def empty?
  @mutex.synchronize { @value == EMPTY }
end

#full?Boolean

Returns if the ‘MVar` currently contains a value.

Returns:

  • (Boolean)


186
187
188
# File 'lib/concurrent/mvar.rb', line 186

def full?
  not empty?
end

#modify(timeout = nil) ⇒ Object

Atomically ‘take`, yield the value to a block for transformation, and then `put` the transformed value. Returns the transformed value. A timeout can be set to limit the time spent blocked, in which case it returns `TIMEOUT` if the time is exceeded.

Returns:

  • (Object)

    the transformed value, or ‘TIMEOUT`

Raises:

  • (ArgumentError)


109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
# File 'lib/concurrent/mvar.rb', line 109

def modify(timeout = nil)
  raise ArgumentError.new('no block given') unless block_given?

  @mutex.synchronize do
    wait_for_full(timeout)

    # If we timed out we'll still be empty
    if unlocked_full?
      value = @value
      @value = yield value
      @full_condition.signal
      apply_deref_options(value)
    else
      TIMEOUT
    end
  end
end

#modify!Object

Non-blocking version of ‘modify` that will yield with `EMPTY` if there is no value yet.

Raises:

  • (ArgumentError)


165
166
167
168
169
170
171
172
173
174
175
176
177
178
# File 'lib/concurrent/mvar.rb', line 165

def modify!
  raise ArgumentError.new('no block given') unless block_given?

  @mutex.synchronize do
    value = @value
    @value = yield value
    if unlocked_empty?
      @empty_condition.signal
    else
      @full_condition.signal
    end
    apply_deref_options(value)
  end
end

#put(value, timeout = nil) ⇒ Object

Put a value into an ‘MVar`, blocking if there is already a value until it is empty. A timeout can be set to limit the time spent blocked, in which case it returns `TIMEOUT` if the time is exceeded.

Returns:

  • (Object)

    the value that was put, or ‘TIMEOUT`



89
90
91
92
93
94
95
96
97
98
99
100
101
102
# File 'lib/concurrent/mvar.rb', line 89

def put(value, timeout = nil)
  @mutex.synchronize do
    wait_for_empty(timeout)

    # If we timed out we won't be empty
    if unlocked_empty?
      @value = value
      @full_condition.signal
      apply_deref_options(value)
    else
      TIMEOUT
    end
  end
end

#set!(value) ⇒ Object

Non-blocking version of ‘put` that will overwrite an existing value.



155
156
157
158
159
160
161
162
# File 'lib/concurrent/mvar.rb', line 155

def set!(value)
  @mutex.synchronize do
    old_value = @value
    @value = value
    @full_condition.signal
    apply_deref_options(old_value)
  end
end

#take(timeout = nil) ⇒ Object

Remove the value from an ‘MVar`, leaving it empty, and blocking if there isn’t a value. A timeout can be set to limit the time spent blocked, in which case it returns ‘TIMEOUT` if the time is exceeded.

Returns:

  • (Object)

    the value that was taken, or ‘TIMEOUT`



69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
# File 'lib/concurrent/mvar.rb', line 69

def take(timeout = nil)
  @mutex.synchronize do
    wait_for_full(timeout)

    # If we timed out we'll still be empty
    if unlocked_full?
      value = @value
      @value = EMPTY
      @empty_condition.signal
      apply_deref_options(value)
    else
      TIMEOUT
    end
  end
end

#try_put!(value) ⇒ Object

Non-blocking version of ‘put`, that returns whether or not it was successful.



142
143
144
145
146
147
148
149
150
151
152
# File 'lib/concurrent/mvar.rb', line 142

def try_put!(value)
  @mutex.synchronize do
    if unlocked_empty?
      @value = value
      @full_condition.signal
      true
    else
      false
    end
  end
end

#try_take!Object

Non-blocking version of ‘take`, that returns `EMPTY` instead of blocking.



128
129
130
131
132
133
134
135
136
137
138
139
# File 'lib/concurrent/mvar.rb', line 128

def try_take!
  @mutex.synchronize do
    if unlocked_full?
      value = @value
      @value = EMPTY
      @empty_condition.signal
      apply_deref_options(value)
    else
      EMPTY
    end
  end
end