Class: ConcurrentSHM::Condition

Inherits:
Object
  • Object
show all
Defined in:
ext/concurrent-shm/posix.c,
ext/concurrent-shm/posix.c

Overview

A POSIX-threads condition variable, with the ‘pshared` attribute set to `PTHREAD_PROCESS_SHARED`, allocated in a shared memory space.

Instance Method Summary collapse

Constructor Details

#initialize(region) ⇒ Object

Initializes a shared condition variable in the specified memory region.

Parameters:

  • region (Region)

    the memory region

Raises:

  • (RangeError)

    if the region is too small or not aligned to a 16-byte boundary



650
651
652
653
654
655
656
657
658
659
660
661
662
# File 'ext/concurrent-shm/posix.c', line 650

static VALUE condition_initialize(VALUE self, VALUE region)
{
    condition_t * cond = value_as_condition(self);
    set_region(cond, 16, region);
    __set_finalizer(self, cond);

    if (__shared_retain(&cond->shared->refcount) == 1) {
        chk_err(pthread_condattr_init, (&cond->shared->attr), "");
        chk_err(pthread_condattr_setpshared, (&cond->shared->attr, PTHREAD_PROCESS_SHARED), "");
        chk_err(pthread_cond_init, (&cond->shared->cond, &cond->shared->attr), "");
    }
    return self;
}

Instance Method Details

#broadcastObject

Wakes all threads waiting on this condition.



675
676
677
678
679
680
# File 'ext/concurrent-shm/posix.c', line 675

static VALUE condition_broadcast(VALUE self)
{
    condition_t * cond = value_as_condition(self);
    chk_err(pthread_cond_broadcast, (&cond->shared->cond), "");
    return Qnil;
}

#signalObject

Wakes at least one thread waiting on this condition.



666
667
668
669
670
671
# File 'ext/concurrent-shm/posix.c', line 666

static VALUE condition_signal(VALUE self)
{
    condition_t * cond = value_as_condition(self);
    chk_err(pthread_cond_signal, (&cond->shared->cond), "");
    return Qnil;
}

#wait(mutex, timeout = nil) ⇒ Boolean

Releases the mutex lock and waits, reacquiring the mutex lock on wakeup. Returns after the specified timeout passes, if a timeout is specified.

Parameters:

  • mutex (Mutex)

    the mutex

  • timeout (Numeric) (defaults to: nil)

    the timeout

Returns:

  • (Boolean)

    ‘false` if the wait timed out, `true` otherwise



707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
# File 'ext/concurrent-shm/posix.c', line 707

static VALUE condition_wait(int argc, VALUE * argv, VALUE self)
{
    VALUE mutex, timeout;
    rb_scan_args(argc, argv, "11", &mutex, &timeout);

    condition_t * cond = value_as_condition(self);
    mutex_t * mu = value_as_mutex(mutex);

    struct cond_wait_args args = { .cond = &cond->shared->cond, .mu = &mu->shared->mu };
    if (!NIL_P(timeout)) {
        struct timespec tv = {0};
        if (RB_TYPE_P(timeout, T_FIXNUM)) {
            tv.tv_sec = FIX2ULONG(timeout);
        } else if (RB_TYPE_P(timeout, T_BIGNUM)) {
            tv.tv_sec = rb_big2ull(timeout);
        } else if (RB_TYPE_P(timeout, T_FLOAT)) {
            double v = RFLOAT_VALUE(timeout) + 0.5e-9; // round up to nearest ns
            tv.tv_sec = (time_t)v;
            tv.tv_nsec = (v - tv.tv_sec) * 1000000000L;
        } else {
            rb_raise(rb_eArgError, "Invalid timeout");
        }
        args.tv = &tv;
    }

    int err = (int)(uintptr_t)rb_thread_call_without_gvl(
        (void * (*)(void *))cond_wait_without_gvl, &args,
        (void (*)(void *))cond_wait_cancel, &args);

    if (err == ETIMEDOUT) {
        return Qfalse;
    }

    rb_check_syserr_fail_strf(err, err, "pthread_cond_wait()");
    return Qtrue;
}