Class: EMRPC::ReconnectingPid

Inherits:
Object
  • Object
show all
Includes:
Pid
Defined in:
lib/emrpc/evented_api/reconnecting_pid.rb

Overview

ReconnectingPid collects all messages in the backlog buffer and tries to reconnect. Calls self.on_raise() with the following exceptions: *

Direct Known Subclasses

Client::BlockingPid

Defined Under Namespace

Classes: AttemptsError, BacklogError, ReconnectingError, TimeoutError

Constant Summary collapse

DEFAULT_MAX_BACKLOG =
256
DEFAULT_MAX_ATTEMPTS =
5
DEFAULT_TIMEOUT =

sec.

5
DEFAULT_TIMER =
Timers::EVENTED

Constants included from ProtocolMapper

ProtocolMapper::MAP

Instance Attribute Summary

Attributes included from Pid

#_bind_address, #_em_server_signature, #_protocol, #connections, #killed, #options, #uuid

Instance Method Summary collapse

Methods included from Pid

#==, #_send_dirty, #_uid, #bind, #connect, #connection_established, #connection_unbind, #connection_uuids, #disconnect, #encode_b381b571_1ab2_5889_8221_855dbbc76242, #find_pid, #inspect, #kill, #killed?, #marshal_dump, #marshal_load, new, #pid_class_name, #spawn, #tcp_spawn, #thread_spawn

Methods included from DebugPidCallbacks

#_debug, #handshake_failed, #on_raise, #on_return

Methods included from ProtocolMapper

#make_client_connection, #make_server_connection, register_protocol

Methods included from DefaultCallbacks

#handshake_failed, #on_raise, #on_return

Constructor Details

#initialize(address, options = {}) ⇒ ReconnectingPid

Arguments:

address          Address if a pid or the pid itself to connect to.

Options:

:max_backlog     Maximum backlog size. BacklogError is raised when backlog becomes larger than 
                 the specified size. Default is 256 messages.

:max_attempts    Maximum number of connection attempts. AttemptsError is raised when this number is exceeded.
                 Counter is set to zero after each successful connection. Default is 5 attempts.

:timeout         Time interval in seconds. TimeoutError is raised when connection was not established
                 in the specified amount of time. Default is 5 seconds.

:timer           Proc which runs a periodic timer. Default is Timers::EVENTED. See EMRPC::Timers for more info.


29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# File 'lib/emrpc/evented_api/reconnecting_pid.rb', line 29

def initialize(address, options = {})
  super(address, options)
  
  @address = address

  # Options
  
  @max_backlog    = options[:max_backlog]  || DEFAULT_MAX_BACKLOG
  @max_attempts   = options[:max_attempts] || DEFAULT_MAX_ATTEMPTS
  @timeout        = options[:timeout]      || DEFAULT_TIMEOUT
  @timer          = options[:timer]        || DEFAULT_TIMER
  
  # Gentlemen, start your engines!
  
  @attempts = 1
  @backlog  = Array.new
  @timeout_thread = @timer.call([ @timeout, 1 ].max, method(:timer_action))
  connect(address)
end

Instance Method Details

#connected(rpid) ⇒ Object



66
67
68
69
70
# File 'lib/emrpc/evented_api/reconnecting_pid.rb', line 66

def connected(rpid)
  @rpid = rpid
  @attempts = 1
  flush!
end

#connection_failed(conn) ⇒ Object



77
78
79
80
81
82
83
# File 'lib/emrpc/evented_api/reconnecting_pid.rb', line 77

def connection_failed(conn)
  a = (@attempts += 1)
  if a > @max_attempts
    on_raise(self, AttemptsError.new("Maximum number of #{@max_attempts} connecting attempts exceeded"))
  end
  connect(@address)
end

#disconnected(rpid) ⇒ Object



72
73
74
75
# File 'lib/emrpc/evented_api/reconnecting_pid.rb', line 72

def disconnected(rpid)
  @rpid = nil
  connect(@address) unless killed?
end

#flush!Object



60
61
62
63
64
# File 'lib/emrpc/evented_api/reconnecting_pid.rb', line 60

def flush!
  while args = @backlog.shift
    send(*args)
  end
end

#send(*args) ⇒ Object



49
50
51
52
53
54
55
56
57
58
# File 'lib/emrpc/evented_api/reconnecting_pid.rb', line 49

def send(*args)
  if rpid = @rpid
    rpid.send(*args)
  else
    @backlog.push(args)
    if @backlog.size > @max_backlog
      on_raise(self, BacklogError.new("Backlog exceeded maximum size of #{@max_backlog} messages"))
    end
  end
end

#timer_actionObject



85
86
87
88
89
90
91
92
93
94
95
96
97
# File 'lib/emrpc/evented_api/reconnecting_pid.rb', line 85

def timer_action
  if @rpid 
    @state = nil
    return
  end
  
  if @state == :timeout
    @state = nil
    on_raise(self, TimeoutError.new("Failed to reconnect with #{@timeout} sec. timeout"))
  else
    @state = :timeout
  end
end