Class: FiniteMachine::MessageQueue Private

Inherits:
Object
  • Object
show all
Defined in:
lib/finite_machine/message_queue.rb

Overview

This class is part of a private API. You should avoid using this class if possible, as it may be removed or be changed in the future.

Responsible for storage of asynchronous messages such as events and callbacks.

Used internally by Observer

API:

  • private

Instance Method Summary collapse

Constructor Details

#initializeMessageQueue

Initialize a MessageQueue

Examples:

message_queue = FiniteMachine::MessageQueue.new

API:

  • public



20
21
22
23
24
25
26
27
# File 'lib/finite_machine/message_queue.rb', line 20

def initialize
  @not_empty = ConditionVariable.new
  @mutex     = Mutex.new
  @queue     = Queue.new
  @dead      = false
  @listeners = []
  @thread    = nil
end

Instance Method Details

#<<(event) ⇒ void

This method returns an undefined value.

Add an asynchronous event to the message queue to process

Examples:

message_queue << AsyncCall.build(...)

Parameters:

  • the event to add

API:

  • public



78
79
80
81
82
83
84
85
86
87
# File 'lib/finite_machine/message_queue.rb', line 78

def <<(event)
  @mutex.synchronize do
    if @dead
      discard_message(event)
    else
      @queue << event
      @not_empty.signal
    end
  end
end

#alive?Boolean

Check whether or not the message queue is alive

Examples:

message_queue.alive?

Returns:

API:

  • public



123
124
125
# File 'lib/finite_machine/message_queue.rb', line 123

def alive?
  @mutex.synchronize { !@dead }
end

#empty?Boolean

Check whether or not there are any messages to handle

Examples:

message_queue.empty?

Returns:

API:

  • public



111
112
113
# File 'lib/finite_machine/message_queue.rb', line 111

def empty?
  @mutex.synchronize { @queue.empty? }
end

#inspectString

Inspect this message queue

Examples:

message_queue.inspect

Returns:

API:

  • public



191
192
193
194
195
# File 'lib/finite_machine/message_queue.rb', line 191

def inspect
  @mutex.synchronize do
    "#<#{self.class}:#{object_id.to_s(16)} @size=#{size}, @dead=#{@dead}>"
  end
end

#join(timeout = nil) ⇒ Thread?

Join the message queue from the current thread

Examples:

message_queue.join

Parameters:

  • (defaults to: nil)

    the time limit

Returns:

API:

  • public



138
139
140
141
142
# File 'lib/finite_machine/message_queue.rb', line 138

def join(timeout = nil)
  return unless @thread

  timeout.nil? ? @thread.join : @thread.join(timeout)
end

#running?Boolean

Check whether or not the message queue is running

Examples:

message_queue.running?

Returns:

API:

  • public



63
64
65
# File 'lib/finite_machine/message_queue.rb', line 63

def running?
  !@thread.nil? && alive?
end

#shutdownBoolean

Shut down this message queue and clean it up

Examples:

message_queue.shutdown

Returns:

Raises:

API:

  • public



154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
# File 'lib/finite_machine/message_queue.rb', line 154

def shutdown
  raise MessageQueueDeadError, "message queue already dead" if @dead

  queue = []
  @mutex.synchronize do
    @dead = true
    @not_empty.broadcast

    queue = @queue
    @queue.clear
  end
  while !queue.empty?
    discard_message(queue.pop)
  end
  true
end

#sizeInteger

The number of messages waiting for processing

Examples:

message_queue.size

Returns:

API:

  • public



179
180
181
# File 'lib/finite_machine/message_queue.rb', line 179

def size
  @mutex.synchronize { @queue.size }
end

#spawn_threadThread

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Spawn a new background thread

Returns:

API:

  • private



48
49
50
51
52
53
# File 'lib/finite_machine/message_queue.rb', line 48

def spawn_thread
  @thread = Thread.new do
    Thread.current.abort_on_exception = true
    process_events
  end
end

#startThread?

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Start a new thread with a queue of callback events to run

Examples:

message_queue.start

Returns:

API:

  • private



37
38
39
40
41
# File 'lib/finite_machine/message_queue.rb', line 37

def start
  return if running?

  @mutex.synchronize { spawn_thread }
end

#subscribe(*args, &block) ⇒ void

This method returns an undefined value.

Add a listener for the message queue to receive notifications

Examples:

message_queue.subscribe { |event| ... }

API:

  • public



97
98
99
100
101
102
103
# File 'lib/finite_machine/message_queue.rb', line 97

def subscribe(*args, &block)
  @mutex.synchronize do
    listener = Listener.new(*args)
    listener.on_delivery(&block)
    @listeners << listener
  end
end