Class: ConcurrentSHM::Channel

Inherits:
Object
  • Object
show all
Defined in:
lib/concurrent-shm/channel.rb

Overview

A channel for asynchronously passing data between processes, safely, via shared memory spaces.

Direct Known Subclasses

Buffered, SingleBuffered, Unbuffered

Defined Under Namespace

Classes: Buffered, ClosedWriteError, SingleBuffered, Unbuffered

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.new(shm, depth:, width:, offset: 0, autosize: true) ⇒ Channel

Allocates a channel in a shared memory space at the specified offset, optionally initializing the space to the required size. The class of the returned channel depends on the width and depth.

| Depth | Width | Behavior |-|-|- | 0 | 0 | Unbuffered, zero-width | 0 | Positive | Unbuffered, fixed-width | 0 | Negative | Unbuffered, variable-width | 1 | 0 | Single-buffered, zero-width | 1 | Positive | Single-buffered, fixed-width | 1 | Negative | Single-buffered, variable-width | 2+ | 0 | Buffered, zero-width | 2+ | Positive | Buffered, fixed-width | 2+ | Negative | Not supported (buffered, variable-width)

When allocating a variable-width channel, ‘width` is inverted and passed as `capacity`.

Parameters:

  • shm (SharedMemory)

    the shared memory space

  • offset (Integer) (defaults to: 0)

    the offset to place the channel at in the shared memory space

  • autosize (Boolean) (defaults to: true)

    whether to initialize the space to the required size

  • depth (Integer)

    the channel buffer depth

  • width (Integer)

    the channel data width

Returns:

Raises:

  • (ArgumentError)

    if depth is not an integer, width is not an integer, or depth is negative

  • (RangeError)

    if the offset is not a multiple of 16, or the shared memory space is not large enough



396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
# File 'lib/concurrent-shm/channel.rb', line 396

def new(shm, depth:, width:, offset: 0, autosize: true)
  raise ArgumentError, "Depth is not an integer" unless depth.is_a?(Integer)
  raise ArgumentError, "Width is not an integer" unless width.is_a?(Integer)
  raise ArgumentError, "Depth must be positive" if depth < 0

  args = { offset: offset, autosize: autosize }

  if depth == 0
    if width < 0
      Unbuffered::Variable.new(shm, capacity: -width, **args)
    elsif width > 0
      Unbuffered::Fixed.new(shm, width: width, **args)
    else
      Unbuffered::Empty.new(shm, **args)
    end

  elsif depth == 1
    if width < 0
      SingleBuffered::Variable.new(shm, capacity: -width, **args)
    elsif width > 0
      SingleBuffered::Fixed.new(shm, width: width, **args)
    else
      SingleBuffered::Empty.new(shm, **args)
    end

  else
    if width < 0
      raise "Buffered variable-width mode is not supported"
    elsif width > 0
      Buffered::Fixed.new(shm, depth: depth, width: width, **args)
    else
      Buffered::Empty.new(shm, depth: depth, **args)
    end
  end
end

Instance Method Details

#closenil

Close the channel.

Returns:

  • (nil)

Raises:

  • (RuntimeError)

    if the channel is already closed



487
488
489
490
491
492
493
494
# File 'lib/concurrent-shm/channel.rb', line 487

def close
  locked do
    raise "Already closed" if closed?

    @cond.broadcast
    @closed[] = 1
  end
end