Class: Concurrent::Actors::Actor::Mailbox

Inherits:
Object
  • Object
show all
Defined in:
lib/concurrent/actors/mailbox.rb

Defined Under Namespace

Classes: Filter, Timeout

Instance Method Summary collapse

Constructor Details

#initializeMailbox

Returns a new instance of Mailbox.



54
55
56
57
# File 'lib/concurrent/actors/mailbox.rb', line 54

def initialize
  @channel = Channel.new
  @skipped = []
end

Instance Method Details

#<<(value) ⇒ Object

safe for multiple writers



60
61
62
63
# File 'lib/concurrent/actors/mailbox.rb', line 60

def <<(value)
  @channel << value
  self
end

#receive(timeout = nil) ⇒ Object

safe only for a single reader



66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
# File 'lib/concurrent/actors/mailbox.rb', line 66

def receive(timeout=nil)
  filter = Filter.new
  if block_given?
    yield filter
    raise ArgumentError, "Empty filter" if filter.empty?
  else
    filter.when(Object) { |m| m }
  end

  value = nil
  action = nil

  found_at = nil
  @skipped.each_with_index do |obj, index|
    action = filter.action_for obj
    if action
      value = obj
      found_at = index
      break
    end
  end
  @skipped.delete_at found_at if found_at

  unless action
    timeout_instance = nil
    timeout_event = nil

    if timeout
      timeout_instance = Timeout.new
      if timeout > 0
        timeout_event = Scheduler.after_delay!(timeout) do
          @channel << timeout_instance
        end
      else
        @channel << timeout_instance
      end
    end

    until action
      value = @channel.receive
      if Timeout === value
        if value == timeout_instance
          raise TimeoutError, "Receive timed out"
        else
          # Timeout left over from previous call
          next
        end
      end
      action = filter.action_for value
      @skipped.push value unless action
    end

    timeout_event.cancel if timeout_event
  end

  action.call value
end