Class: DispatchQueue::ConcurrentQueue

Inherits:
Object
  • Object
show all
Includes:
DispatchAfterImpl, DispatchSyncImpl
Defined in:
lib/dispatch_queue_rb/concurrent_queue.rb

Instance Method Summary collapse

Methods included from DispatchAfterImpl

#dispatch_after

Methods included from DispatchSyncImpl

#dispatch_barrier_sync, #dispatch_sync

Constructor Details

#initialize(parent_queue: nil) ⇒ ConcurrentQueue

Returns a new instance of ConcurrentQueue.



13
14
15
16
17
18
19
20
# File 'lib/dispatch_queue_rb/concurrent_queue.rb', line 13

def initialize( parent_queue: nil )
  @mutex = Mutex.new
  @condition = ConditionVariable.new
  @task_list = []
  @parent_queue = parent_queue || Dispatch.default_queue
  @scheduled_count = 0
  @barrier_count = 0
end

Instance Method Details

#dispatch_async(group: nil, &task) ⇒ Object



22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# File 'lib/dispatch_queue_rb/concurrent_queue.rb', line 22

def dispatch_async( group:nil, &task )
  group.enter() if group
  continuation = Continuation.new( target_queue:@parent_queue, group:group ) do
    _run_task( task, false )
  end

  schedule_immediately = @mutex.synchronize do
    if ( @barrier_count > 0)
      @task_list << continuation
      false
    else
      @scheduled_count += 1
      true
    end
  end

  continuation.run() if schedule_immediately
  self
end

#dispatch_barrier_async(group: nil, &task) ⇒ Object



42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
# File 'lib/dispatch_queue_rb/concurrent_queue.rb', line 42

def dispatch_barrier_async( group:nil, &task )
  group.enter() if group
  continuation = Continuation.new( target_queue:@parent_queue, group:group, barrier:true ) do
    _run_task( task, true )
  end

  barrier_task, tasks = @mutex.synchronize do
    @barrier_count += 1
    @task_list << continuation
    resume_pending = ( @scheduled_count == 0 && @barrier_count == 1)
    _sync_get_next_batch() if resume_pending
  end

  _schedule_next_batch( barrier_task, tasks )
end