Class: Qpid::Proton::Reactor::Connector

Inherits:
BaseHandler show all
Defined in:
lib/reactor/connector.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods inherited from BaseHandler

#on_unhandled

Constructor Details

#initialize(connection) ⇒ Connector

Returns a new instance of Connector.



28
29
30
31
32
33
34
# File 'lib/reactor/connector.rb', line 28

def initialize(connection)
  @connection = connection
  @address = nil
  @heartbeat = nil
  @reconnect = nil
  @ssl_domain = nil
end

Instance Attribute Details

#addressObject

Returns the value of attribute address.



24
25
26
# File 'lib/reactor/connector.rb', line 24

def address
  @address
end

#reconnectObject

Returns the value of attribute reconnect.



25
26
27
# File 'lib/reactor/connector.rb', line 25

def reconnect
  @reconnect
end

#ssl_domainObject

Returns the value of attribute ssl_domain.



26
27
28
# File 'lib/reactor/connector.rb', line 26

def ssl_domain
  @ssl_domain
end

Instance Method Details

#connect(connection) ⇒ Object



75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
# File 'lib/reactor/connector.rb', line 75

def connect(connection)
  url = @address.next
  connection.hostname = "#{url.host}:#{url.port}"

  transport = Qpid::Proton::Transport.new
  transport.bind(connection)
  if !@heartbeat.nil?
    transport.idle_timeout = @heartbeat
  elsif (url.scheme == "amqps") && !@ssl_domain.nil?
    @ssl = Qpid::Proton::SSL.new(transport, @ssl_domain)
    @ss.peer_hostname = url.host
  elsif !url.username.nil?
    sasl = transport.sasl
    if url.username == "anonymous"
      sasl.mechanisms("ANONYMOUS")
    else
      sasl.plain(url.username, url.password)
    end
  end
end

#on_connection_local_open(event) ⇒ Object



36
37
38
# File 'lib/reactor/connector.rb', line 36

def on_connection_local_open(event)
  self.connect(event.connection)
end

#on_connection_remote_close(event) ⇒ Object



71
72
73
# File 'lib/reactor/connector.rb', line 71

def on_connection_remote_close(event)
  @connection = nil
end

#on_connection_remote_open(event) ⇒ Object



40
41
42
43
44
45
# File 'lib/reactor/connector.rb', line 40

def on_connection_remote_open(event)
  if !@reconnect.nil?
    @reconnect.reset
    @transport = nil
  end
end

#on_timer_task(event) ⇒ Object



67
68
69
# File 'lib/reactor/connector.rb', line 67

def on_timer_task(event)
  self.connect(@connection)
end

#on_transport_closed(event) ⇒ Object



51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
# File 'lib/reactor/connector.rb', line 51

def on_transport_closed(event)
  if !@connection.nil? && !(@connection.state & Qpid::Proton::Endpoint::LOCAL_ACTIVE).zero?
    if !@reconnect.nil?
      event.transport.unbind
      delay = @reconnect.next
      if delay == 0
        self.connect(@connection)
      else
        event.reactor.schedule(delay, self)
      end
    else
      @connection = nil
    end
  end
end

#on_transport_tail_closed(event) ⇒ Object



47
48
49
# File 'lib/reactor/connector.rb', line 47

def on_transport_tail_closed(event)
  self.on_transport_closed(event)
end