Class: ZK::ThreadedCallback

Inherits:
Object
  • Object
show all
Includes:
Exceptions, Logger
Defined in:
lib/zk/threaded_callback.rb

Overview

A class that encapsulates the queue + thread that calls a callback. Repsonds to call but places call on a queue to be delivered by a thread. You will not have a useful return value from call so this is only useful for background processing.

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Logger

#logger, wrapped_logger, wrapped_logger=

Constructor Details

#initialize(callback = nil, &blk) ⇒ ThreadedCallback

Returns a new instance of ThreadedCallback.



12
13
14
15
16
17
# File 'lib/zk/threaded_callback.rb', line 12

def initialize(callback=nil, &blk)
  @callback = callback || blk

  @state  = :paused
  reopen_after_fork!
end

Instance Attribute Details

#callbackObject (readonly)

Returns the value of attribute callback.



10
11
12
# File 'lib/zk/threaded_callback.rb', line 10

def callback
  @callback
end

Instance Method Details

#call(*args) ⇒ Object



52
53
54
55
56
57
58
59
60
# File 'lib/zk/threaded_callback.rb', line 52

def call(*args)
  @mutex.lock
  begin
    @array << args
    @cond.broadcast
  ensure
    @mutex.unlock rescue nil
  end
end

#pause_before_fork_in_parentObject

shuts down the event delivery thread, but keeps the queue so we can continue delivering queued events when #resume_after_fork_in_parent is called



87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
# File 'lib/zk/threaded_callback.rb', line 87

def pause_before_fork_in_parent
  @mutex.lock
  begin
    raise InvalidStateError, "@state was not :running, @state: #{@state.inspect}" if @state != :running
    return if @state == :paused 

    @state = :paused
    @cond.broadcast
  ensure
    @mutex.unlock rescue nil
  end

  return unless @thread

  @thread.join
  @thread = nil
end

#resume_after_fork_in_parentObject



105
106
107
108
109
110
111
112
113
114
115
116
117
# File 'lib/zk/threaded_callback.rb', line 105

def resume_after_fork_in_parent
  @mutex.lock
  begin
    raise InvalidStateError, "@state was not :paused, @state: #{@state.inspect}" if @state != :paused
    raise InvalidStateError, "@thread was not nil! #{@thread.inspect}" if @thread 

    @state = :running
#         logger.debug { "#{self.class}##{__method__} spawning dispatch thread" }
    spawn_dispatch_thread
  ensure
    @mutex.unlock rescue nil
  end
end

#running?Boolean

Returns:

  • (Boolean)


19
20
21
# File 'lib/zk/threaded_callback.rb', line 19

def running?
  @mutex.synchronize { @state == :running }
end

#shutdown(timeout = 5) ⇒ Object

how long to wait on thread shutdown before we return



29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# File 'lib/zk/threaded_callback.rb', line 29

def shutdown(timeout=5)
#       logger.debug { "#{self.class}##{__method__}" }

  @mutex.lock
  begin
    return true if @state == :shutdown

    @state = :shutdown
    @cond.broadcast
  ensure
    @mutex.unlock rescue nil
  end

  return true unless @thread 

  unless @thread.join(timeout) == @thread
    logger.error { "#{self.class} timed out waiting for dispatch thread, callback: #{callback.inspect}" }
    return false
  end

  true
end