Class: NSQ::Connection

Inherits:
Object
  • Object
show all
Defined in:
lib/nsq/connection.rb

Overview

Represents a single subscribed connection to an nsqd server.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(reader, subscriber, host, port) ⇒ Connection

Returns a new instance of Connection.



9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# File 'lib/nsq/connection.rb', line 9

def initialize(reader, subscriber, host, port)
  @reader        = reader
  @subscriber    = subscriber
  @selector      = reader.selector
  @host          = host
  @port          = port
  @name          = "#{subscriber.name}:#{host}:#{port}"
  @write_monitor = Monitor.new
  @ready_mutex   = Mutex.new
  @sending_ready = false

  # Connect states :init, :interval, :connecting, :connected, :closed
  @connect_state = :init

  @next_connection_time     = nil
  @next_ready_time          = nil
  @connection_backoff_timer = nil
  @ready_backoff_timer      = @subscriber.create_ready_backoff_timer

  connect
end

Instance Attribute Details

#nameObject (readonly)

Returns the value of attribute name.



7
8
9
# File 'lib/nsq/connection.rb', line 7

def name
  @name
end

Instance Method Details

#close(permanent = true) ⇒ Object

:nodoc:



86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
# File 'lib/nsq/connection.rb', line 86

def close(permanent=true) #:nodoc:
  NSQ.logger.debug {"#{@name}: Closing..."}
  @write_monitor.synchronize do
    begin
      @selector.deregister(@socket)
      # Use straight socket to write otherwise we need to use Monitor instead of Mutex
      @socket.write "CLS\n"
      @socket.close
    rescue Exception => e
    ensure
      @connect_state = permanent ? :closed : :init
      @socket        = nil
    end
  end
end

#connectObject

:nodoc:



102
103
104
105
106
107
108
109
110
111
112
113
# File 'lib/nsq/connection.rb', line 102

def connect #:nodoc:
  return unless verify_connect_state?(:init, :interval)
  NSQ.logger.debug {"#{self}: Beginning connect"}
  @connect_state = :connecting
  @buffer        = ''
  @ready_count   = 0
  @socket        = Socket.new(Socket::AF_INET, Socket::SOCK_STREAM, 0)
  @sockaddr      = Socket.pack_sockaddr_in(@port, @host)
  @monitor       = @selector.register(@socket, :w)
  @monitor.value = proc { do_connect }
  do_connect
end

#resetObject

:nodoc:



65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
# File 'lib/nsq/connection.rb', line 65

def reset #:nodoc:
  return unless verify_connect_state?(:connecting, :connected)
  # Close with the hopes of re-establishing
  close(false)
  @write_monitor.synchronize do
    return unless verify_connect_state?(:init)
    @connection_backoff_timer ||= @subscriber.create_connection_backoff_timer
    @connection_backoff_timer.failure
    interval = @connection_backoff_timer.interval
    if interval > 0
      @connect_state = :interval
      NSQ.logger.debug {"#{self}: Reattempting connection in #{interval} seconds"}
      @reader.add_timeout(interval) do
        connect
      end
    else
      connect
    end
  end
end

#send_finish(id, success) ⇒ Object

:nodoc:



43
44
45
46
47
48
49
50
51
52
53
54
# File 'lib/nsq/connection.rb', line 43

def send_finish(id, success) #:nodoc:
  write "FIN #{id}\n"
  @ready_mutex.synchronize do
    @ready_count -= 1
    if success
      @ready_backoff_timer.success
    else
      @ready_backoff_timer.failure
    end
    check_ready
  end
end

#send_init(topic, channel, short_id, long_id) ⇒ Object

:nodoc:



31
32
33
34
35
# File 'lib/nsq/connection.rb', line 31

def send_init(topic, channel, short_id, long_id) #:nodoc:
  write NSQ::MAGIC_V2
  write "SUB #{topic} #{channel} #{short_id} #{long_id}\n"
  self.send_ready
end

#send_readyObject

:nodoc:



37
38
39
40
41
# File 'lib/nsq/connection.rb', line 37

def send_ready #:nodoc:
  @ready_count = @subscriber.ready_count
  write "RDY #{@ready_count}\n" unless @subscriber.stopped?
  @sending_ready = false
end

#send_requeue(id, time_ms) ⇒ Object

:nodoc:



56
57
58
59
60
61
62
63
# File 'lib/nsq/connection.rb', line 56

def send_requeue(id, time_ms) #:nodoc:
  write "REQ #{id} #{time_ms}\n"
  @ready_mutex.synchronize do
    @ready_count -= 1
    @ready_backoff_timer.failure
    check_ready
  end
end

#to_sObject

:nodoc:



115
116
117
# File 'lib/nsq/connection.rb', line 115

def to_s #:nodoc:
  @name
end