Class: Celluloid::IO::Mailbox

Inherits:
Mailbox
  • Object
show all
Defined in:
lib/celluloid/io/mailbox.rb

Overview

An alternative implementation of Celluloid::Mailbox using Reactor

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(reactor = nil) ⇒ Mailbox

Returns a new instance of Mailbox.



7
8
9
10
11
# File 'lib/celluloid/io/mailbox.rb', line 7

def initialize(reactor = nil)
  @messages = []
  @mutex = Mutex.new
  @reactor = reactor || Reactor.new
end

Instance Attribute Details

#reactorObject (readonly)

Returns the value of attribute reactor.



5
6
7
# File 'lib/celluloid/io/mailbox.rb', line 5

def reactor
  @reactor
end

Instance Method Details

#<<(message) ⇒ Object

Add a message to the Mailbox



14
15
16
17
18
19
20
21
22
23
24
# File 'lib/celluloid/io/mailbox.rb', line 14

def <<(message)
  @mutex.lock
  begin
    @messages << message
    @reactor.wakeup
  rescue IOError
    raise MailboxError, "dead recipient"
  ensure @mutex.unlock
  end
  nil
end

#receive(timeout = nil, &block) ⇒ Object

Receive a message from the Mailbox



40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
# File 'lib/celluloid/io/mailbox.rb', line 40

def receive(timeout = nil, &block)
  message = next_message(&block)

  until message
    if timeout
      now = Time.now
      wait_until ||= now + timeout
      wait_interval = wait_until - now
      return if wait_interval < 0
    else
      wait_interval = nil
    end
    
    @reactor.run_once(wait_interval)
    message = next_message(&block)
  end

  message
rescue IOError
  shutdown # force shutdown of the mailbox
  raise MailboxShutdown, "mailbox shutdown called during receive"
end

#shutdownObject

Cleanup any IO objects this Mailbox may be using



64
65
66
67
# File 'lib/celluloid/io/mailbox.rb', line 64

def shutdown
  @reactor.shutdown
  super
end

#system_event(event) ⇒ Object

Add a high-priority system event to the Mailbox



27
28
29
30
31
32
33
34
35
36
37
# File 'lib/celluloid/io/mailbox.rb', line 27

def system_event(event)
  @mutex.lock
  begin
    @messages.unshift event
    @reactor.wakeup
  rescue IOError
    # Silently fail if messages are sent to dead actors
  ensure @mutex.unlock
  end
  nil
end