Class: NATSD::Server

Inherits:
Object
  • Object
show all
Defined in:
lib/nats/server/varz.rb,
lib/nats/server/connz.rb,
lib/nats/server/server.rb,
lib/nats/server/cluster.rb,
lib/nats/server/options.rb

Class Attribute Summary collapse

Class Method Summary collapse

Class Attribute Details

.auth_requiredObject (readonly) Also known as: auth_required?

Returns the value of attribute auth_required.



11
12
13
# File 'lib/nats/server/server.rb', line 11

def auth_required
  @auth_required
end

.auth_timeoutObject (readonly)

Returns the value of attribute auth_timeout.



12
13
14
# File 'lib/nats/server/server.rb', line 12

def auth_timeout
  @auth_timeout
end

.connectionsObject

Returns the value of attribute connections.



13
14
15
# File 'lib/nats/server/server.rb', line 13

def connections
  @connections
end

.debug_flagObject (readonly) Also known as: debug_flag?

Returns the value of attribute debug_flag.



11
12
13
# File 'lib/nats/server/server.rb', line 11

def debug_flag
  @debug_flag
end

.healthzObject

Returns the value of attribute healthz.



13
14
15
# File 'lib/nats/server/server.rb', line 13

def healthz
  @healthz
end

.idObject (readonly)

Returns the value of attribute id.



11
12
13
# File 'lib/nats/server/server.rb', line 11

def id
  @id
end

.in_bytesObject

Returns the value of attribute in_bytes.



13
14
15
# File 'lib/nats/server/server.rb', line 13

def in_bytes
  @in_bytes
end

.in_msgsObject

Returns the value of attribute in_msgs.



13
14
15
# File 'lib/nats/server/server.rb', line 13

def in_msgs
  @in_msgs
end

.infoObject (readonly)

Returns the value of attribute info.



11
12
13
# File 'lib/nats/server/server.rb', line 11

def info
  @info
end

.log_timeObject (readonly)

Returns the value of attribute log_time.



11
12
13
# File 'lib/nats/server/server.rb', line 11

def log_time
  @log_time
end

.max_connectionsObject

Returns the value of attribute max_connections.



13
14
15
# File 'lib/nats/server/server.rb', line 13

def max_connections
  @max_connections
end

.max_control_lineObject (readonly)

Returns the value of attribute max_control_line.



12
13
14
# File 'lib/nats/server/server.rb', line 12

def max_control_line
  @max_control_line
end

.max_payloadObject (readonly)

Returns the value of attribute max_payload.



12
13
14
# File 'lib/nats/server/server.rb', line 12

def max_payload
  @max_payload
end

.max_pendingObject (readonly)

Returns the value of attribute max_pending.



12
13
14
# File 'lib/nats/server/server.rb', line 12

def max_pending
  @max_pending
end

.num_connectionsObject

Returns the value of attribute num_connections.



13
14
15
# File 'lib/nats/server/server.rb', line 13

def num_connections
  @num_connections
end

.num_routesObject

Returns the value of attribute num_routes.



8
9
10
# File 'lib/nats/server/cluster.rb', line 8

def num_routes
  @num_routes
end

.opt_routesObject (readonly)

Returns the value of attribute opt_routes.



7
8
9
# File 'lib/nats/server/cluster.rb', line 7

def opt_routes
  @opt_routes
end

.optionsObject (readonly)

Returns the value of attribute options.



11
12
13
# File 'lib/nats/server/server.rb', line 11

def options
  @options
end

.out_bytesObject

Returns the value of attribute out_bytes.



13
14
15
# File 'lib/nats/server/server.rb', line 13

def out_bytes
  @out_bytes
end

.out_msgsObject

Returns the value of attribute out_msgs.



13
14
15
# File 'lib/nats/server/server.rb', line 13

def out_msgs
  @out_msgs
end

.ping_intervalObject (readonly)

Returns the value of attribute ping_interval.



12
13
14
# File 'lib/nats/server/server.rb', line 12

def ping_interval
  @ping_interval
end

.ping_maxObject (readonly)

Returns the value of attribute ping_max.



12
13
14
# File 'lib/nats/server/server.rb', line 12

def ping_max
  @ping_max
end

.reconnect_intervalObject (readonly)

Returns the value of attribute reconnect_interval.



7
8
9
# File 'lib/nats/server/cluster.rb', line 7

def reconnect_interval
  @reconnect_interval
end

.route_auth_requiredObject (readonly) Also known as: route_auth_required?

Returns the value of attribute route_auth_required.



7
8
9
# File 'lib/nats/server/cluster.rb', line 7

def route_auth_required
  @route_auth_required
end

.route_ssl_requiredObject (readonly) Also known as: route_ssl_required?

Returns the value of attribute route_ssl_required.



7
8
9
# File 'lib/nats/server/cluster.rb', line 7

def route_ssl_required
  @route_ssl_required
end

.ssl_requiredObject (readonly) Also known as: ssl_required?

Returns the value of attribute ssl_required.



11
12
13
# File 'lib/nats/server/server.rb', line 11

def ssl_required
  @ssl_required
end

.ssl_timeoutObject (readonly)

Returns the value of attribute ssl_timeout.



12
13
14
# File 'lib/nats/server/server.rb', line 12

def ssl_timeout
  @ssl_timeout
end

.syslogObject (readonly)

Returns the value of attribute syslog.



11
12
13
# File 'lib/nats/server/server.rb', line 11

def syslog
  @syslog
end

.trace_flagObject (readonly) Also known as: trace_flag?

Returns the value of attribute trace_flag.



11
12
13
# File 'lib/nats/server/server.rb', line 11

def trace_flag
  @trace_flag
end

.varzObject

Returns the value of attribute varz.



13
14
15
# File 'lib/nats/server/server.rb', line 13

def varz
  @varz
end

Class Method Details

.add_route(route) ⇒ Object



17
18
19
# File 'lib/nats/server/cluster.rb', line 17

def add_route(route)
  connected_routes << route unless route.nil?
end

.auth_ok?(user, pass) ⇒ Boolean

Returns:

  • (Boolean)


182
183
184
185
# File 'lib/nats/server/server.rb', line 182

def auth_ok?(user, pass)
  @options[:users].each { |u| return true if (user == u[:user] && pass == u[:pass]) }
  false
end

.broadcast_proto_to_routes(proto) ⇒ Object



63
64
65
# File 'lib/nats/server/cluster.rb', line 63

def broadcast_proto_to_routes(proto)
  connected_routes.each { |r| r.queue_data(proto) }
end

.broadcast_sub_to_routes(sub) ⇒ Object



90
91
92
# File 'lib/nats/server/cluster.rb', line 90

def broadcast_sub_to_routes(sub)
  broadcast_proto_to_routes(route_sub_proto(sub))
end

.broadcast_unsub_to_routes(sub) ⇒ Object



94
95
96
97
# File 'lib/nats/server/cluster.rb', line 94

def broadcast_unsub_to_routes(sub)
  opt_max_str = " #{sub.max_responses}" unless sub.max_responses.nil?
  broadcast_proto_to_routes("UNSUB #{routed_sid(sub)}#{opt_max_str}#{CR_LF}")
end

.cidObject



187
188
189
# File 'lib/nats/server/server.rb', line 187

def cid
  @cid += 1
end

.close_syslogObject



143
144
145
# File 'lib/nats/server/options.rb', line 143

def close_syslog
  Syslog.close if @options[:syslog]
end

.connected_routesObject



13
14
15
# File 'lib/nats/server/cluster.rb', line 13

def connected_routes
  @routes ||= []
end

.deliver_to_subscriber(sub, subject, reply, msg) ⇒ Object



104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
# File 'lib/nats/server/server.rb', line 104

def deliver_to_subscriber(sub, subject, reply, msg)
  conn = sub.conn

  # Accounting
  @out_msgs += 1
  conn.out_msgs += 1
  unless msg.nil?
    mbs = msg.bytesize
    @out_bytes += mbs
    conn.out_bytes += mbs
  end

  conn.queue_data("MSG #{subject} #{sub.sid} #{reply}#{msg.bytesize}#{CR_LF}#{msg}#{CR_LF}")

  # Account for these response and check for auto-unsubscribe (pruning interest graph)
  sub.num_responses += 1
  conn.delete_subscriber(sub) if (sub.max_responses && sub.num_responses >= sub.max_responses)

  # Check the outbound queue here and react if need be..
  if (conn.get_outbound_data_size + conn.writev_size) > NATSD::Server.max_pending
    conn.error_close SLOW_CONSUMER
    maxp = pretty_size(NATSD::Server.max_pending)
    log "Slow consumer dropped, exceeded #{maxp} pending", conn.client_info
  end
end

.dump_connectionsObject



36
37
38
39
40
41
42
43
44
45
46
47
48
# File 'lib/nats/server/connz.rb', line 36

def dump_connections
  conns, total = [], 0
  ObjectSpace.each_object(NATSD::Connection) do |c|
    next if c.closing?
    total += c.info[:pending_size]
    conns << c.info
  end
  {
    :pending_size => total,
    :num_connections => conns.size,
    :connections => conns
  }
end

.finalize_optionsObject



156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
# File 'lib/nats/server/options.rb', line 156

def finalize_options
  # Addr/Port
  @options[:port] ||= DEFAULT_PORT
  @options[:addr] ||= DEFAULT_HOST

  # Max Connections
  @options[:max_connections] ||= DEFAULT_MAX_CONNECTIONS
  @max_connections = @options[:max_connections]

  # Debug and Tracing
  @debug_flag = @options[:debug]
  @trace_flag = @options[:trace]

  # Log timestamps
  @log_time = @options[:log_time]

  debug @options # Block pass?
  debug "DEBUG is on"
  trace "TRACE is on"

  # Syslog
  @syslog = @options[:syslog]

  # Authorization

  # Multi-user setup for auth
  if @options[:user]
    # Multiple Users setup
    @options[:users] ||= []
    @options[:users].unshift({:user => @options[:user], :pass => @options[:pass]}) if @options[:user]
  elsif @options[:users]
    first = @options[:users].first
    @options[:user], @options[:pass] = first[:user], first[:pass]
  end

  @auth_required = (not @options[:user].nil?)

  @ssl_required = @options[:ssl]

  # Pings
  @options[:ping_interval] ||= DEFAULT_PING_INTERVAL
  @ping_interval = @options[:ping_interval]

  @options[:ping_max] ||= DEFAULT_PING_MAX
  @ping_max = @options[:ping_max]

  # Thresholds
  @options[:max_control_line] ||= MAX_CONTROL_LINE_SIZE
  @max_control_line = @options[:max_control_line]

  @options[:max_payload] ||= MAX_PAYLOAD_SIZE
  @max_payload = @options[:max_payload]

  @options[:max_pending] ||= MAX_PENDING_SIZE
  @max_pending = @options[:max_pending]

  @options[:auth_timeout] ||= AUTH_TIMEOUT
  @auth_timeout = @options[:auth_timeout]

  @options[:ssl_timeout] ||= SSL_TIMEOUT
  @ssl_timeout = @options[:ssl_timeout]
end

.hostObject



22
# File 'lib/nats/server/server.rb', line 22

def host; @options[:addr] end

.info_stringObject



195
196
197
# File 'lib/nats/server/server.rb', line 195

def info_string
  @info.to_json
end

.open_syslogObject



138
139
140
141
# File 'lib/nats/server/options.rb', line 138

def open_syslog
  return unless @options[:syslog]
  Syslog.open("#{@options[:syslog]}", Syslog::LOG_PID, Syslog::LOG_USER) unless Syslog.opened?
end

.parse_rsid(rsid) ⇒ Object



76
77
78
79
# File 'lib/nats/server/cluster.rb', line 76

def parse_rsid(rsid)
  m = RSID.match(rsid)
  return [m[1].to_i, m[2]] if m
end

.parserObject



8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
# File 'lib/nats/server/options.rb', line 8

def parser
  @parser ||= OptionParser.new do |opts|
    opts.banner = "Usage: nats-server [options]"

    opts.separator ""
    opts.separator "Server options:"

    opts.on("-a", "--addr HOST", "Bind to HOST address " +
                                 "(default: #{DEFAULT_HOST})")           { |host| @options[:addr] = host }
    opts.on("-p", "--port PORT", "Use PORT (default: #{DEFAULT_PORT})")  { |port| @options[:port] = port.to_i }
    opts.on("-d", "--daemonize", "Run daemonized in the background")     { @options[:daemonize] = true }
    opts.on("-P", "--pid FILE",  "File to store PID")                    { |file| @options[:pid_file] = file }

    opts.on("-m", "--http_port PORT", "Use HTTP PORT ")                  { |port| @options[:http_port] = port.to_i }

    opts.on("-r", "--cluster_port PORT", "Use Cluster PORT ")            { |port| @options[:cluster_port] = port.to_i }

    opts.on("-c", "--config FILE", "Configuration File")                 { |file| @options[:config_file] = file }

    opts.separator ""
    opts.separator "Logging options:"

    opts.on("-l", "--log FILE", "File to redirect log output")           { |file| @options[:log_file] = file }
    opts.on("-T", "--logtime", "Timestamp log entries (default: false)") { @options[:log_time] = true }
    opts.on("-S", "--syslog IDENT", "Enable Syslog output")              { |ident| @options[:syslog] = ident }
    opts.on("-D", "--debug", "Enable debugging output")                  { @options[:debug] = true }
    opts.on("-V", "--trace", "Trace the raw protocol")                   { @options[:trace] = true }

    opts.separator ""
    opts.separator "Authorization options:"

    opts.on("--user user", "User required for connections")              { |user| @options[:user] = user }
    opts.on("--pass password", "Password required for connections")      { |pass| @options[:pass] = pass }

    opts.separator ""
    opts.on("--ssl", "Enable SSL")                                       { |ssl| @options[:ssl] = true }

    opts.separator ""
    opts.separator "Advanced IO options:"

    opts.on("--no_epoll", "Disable epoll (Linux)")                       { @options[:noepoll] = true }
    opts.on("--no_kqueue", "Disable kqueue (MacOSX and BSD)")            { @options[:nokqueue] = true }

    opts.separator ""
    opts.separator "Common options:"

    opts.on_tail("-h", "--help", "Show this message")                    { puts opts; exit }
    opts.on_tail('-v', '--version', "Show version")                      { puts NATSD::Server.version; exit }
  end
end

.pid_fileObject



24
# File 'lib/nats/server/server.rb', line 24

def pid_file; @options[:pid_file] end

.portObject



23
# File 'lib/nats/server/server.rb', line 23

def port; @options[:port] end

.process_options(argv = []) ⇒ Object



26
27
28
29
30
31
32
33
34
35
36
# File 'lib/nats/server/server.rb', line 26

def process_options(argv=[])
  @options = {}

  # Allow command line to override config file, so do them first.
  parser.parse!(argv)
  read_config_file if @options[:config_file]
  finalize_options
rescue OptionParser::InvalidOption => e
  log_error "Error parsing options: #{e}"
  exit(1)
end

.read_config_fileObject



59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
# File 'lib/nats/server/options.rb', line 59

def read_config_file
  return unless config_file = @options[:config_file]
  config = File.open(config_file) { |f| YAML.load(f) }

  # Command lines args, parsed first, will override these.
  @options[:port] = config['port'] if @options[:port].nil?
  @options[:addr] = config['net'] if @options[:addr].nil?

  if auth = config['authorization']
    @options[:user] = auth['user'] if @options[:user].nil?
    @options[:pass] = auth['password'] if @options[:pass].nil?
    @options[:pass] = auth['pass'] if @options[:pass].nil?
    @options[:token] = auth['token'] if @options[:token].nil?
    @options[:auth_timeout] = auth['timeout'] if @options[:auth_timeout].nil?
    # Multiple Users setup
    @options[:users] = symbolize_users(auth['users']) || []
  end

  # TLS/SSL
  @options[:ssl] = config['ssl'] if @options[:ssl].nil?

  @options[:pid_file] = config['pid_file'] if @options[:pid_file].nil?
  @options[:log_file] = config['log_file'] if @options[:log_file].nil?
  @options[:log_time] = config['logtime'] if @options[:log_time].nil?
  @options[:syslog] = config['syslog'] if @options[:syslog].nil?
  @options[:debug] = config['debug'] if @options[:debug].nil?
  @options[:trace] = config['trace'] if @options[:trace].nil?

  # these just override if present
  @options[:max_control_line] = config['max_control_line'] if config['max_control_line']
  @options[:max_payload] = config['max_payload'] if config['max_payload']
  @options[:max_pending] = config['max_pending'] if config['max_pending']
  @options[:max_connections] = config['max_connections'] if config['max_connections']

  # just set
  @options[:noepoll]  = config['no_epoll'] if config['no_epoll']
  @options[:nokqueue] = config['no_kqueue'] if config['no_kqueue']

  if http = config['http']
    if @options[:http_net].nil?
      @options[:http_net] = http['net'] || @options[:addr]
    end
    @options[:http_port] = http['port'] if @options[:http_port].nil?
    @options[:http_user] = http['user'] if @options[:http_user].nil?
    @options[:http_password] = http['password'] if @options[:http_password].nil?
  end

  if ping = config['ping']
    @options[:ping_interval] = ping['interval'] if @options[:ping_interval].nil?
    @options[:ping_max] = ping['max_outstanding'] if @options[:ping_max].nil?
  end

  if cluster = config['cluster']
    @options[:cluster_port] = cluster['port'] if @options[:cluster_port].nil?
    if auth = cluster['authorization']
      @options[:cluster_user] = auth['user'] if @options[:cluster_user].nil?
      @options[:cluster_pass] = auth['password'] if @options[:cluster_pass].nil?
      @options[:cluster_pass] = auth['pass'] if @options[:cluster_pass].nil?
      @options[:cluster_token] = auth['token'] if @options[:cluster_token].nil?
      @options[:cluster_auth_timeout] = auth['timeout'] if @options[:cluster_auth_timeout].nil?
      @route_auth_required = true
    end
    if routes = cluster['routes']
      @options[:cluster_routes] = routes if @options[:cluster_routes].nil?
    end
  end

rescue => e
  log "Could not read configuration file:  #{e}"
  exit 1
end

.remove_route(route) ⇒ Object



21
22
23
# File 'lib/nats/server/cluster.rb', line 21

def remove_route(route)
  connected_routes.delete(route) unless route.nil?
end

.ridObject



191
192
193
# File 'lib/nats/server/server.rb', line 191

def rid
  @rid += 1
end

.route_auth_ok?(user, pass) ⇒ Boolean

Returns:

  • (Boolean)


43
44
45
# File 'lib/nats/server/cluster.rb', line 43

def route_auth_ok?(user, pass)
  user == @options[:cluster_user] && pass == @options[:cluster_pass]
end

.route_info_stringObject



25
26
27
28
29
30
31
32
33
34
35
36
# File 'lib/nats/server/cluster.rb', line 25

def route_info_string
  @route_info = {
    :server_id => Server.id,
    :host => @options[:cluster_net] || host,
    :port => @options[:cluster_port],
    :version => VERSION,
    :auth_required => route_auth_required?,
    :ssl_required => false,                 # FIXME!
    :max_payload => @max_payload
  }
  @route_info.to_json
end

.route_key(route_url) ⇒ Object



38
39
40
41
# File 'lib/nats/server/cluster.rb', line 38

def route_key(route_url)
  r = URI.parse(route_url)
  "#{r.host}:#{r.port}"
end

.route_sub_proto(sub) ⇒ Object



85
86
87
88
# File 'lib/nats/server/cluster.rb', line 85

def route_sub_proto(sub)
  return "SUB #{sub.subject} #{routed_sid(sub)}#{CR_LF}" if sub.qgroup.nil?
  return "SUB #{sub.subject} #{sub.qgroup} #{routed_sid(sub)}#{CR_LF}"
end

.route_to_subscribers(subject, reply, msg, is_route = false) ⇒ Object



130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
# File 'lib/nats/server/server.rb', line 130

def route_to_subscribers(subject, reply, msg, is_route=false)
  qsubs = nil

  # Allows nil reply to not have extra space
  reply = reply + ' ' if reply

  # Accounting
  @in_msgs += 1
  @in_bytes += msg.bytesize unless msg.nil?

  # Routes
  routes = nil

  @sublist.match(subject).each do |sub|
    # Skip anyone in the closing state
    next if sub.conn.closing

    # Skip all routes if sourced from another route (1-hop semantics)
    next if (is_route && sub.conn.is_route?)

    if sub[:qgroup].nil?
      if sub.conn.is_route?
        # Only send messages once over a given route
        routes ||= Set.new
        deliver_to_subscriber(sub, subject, reply, msg) unless routes.include?(sub.conn.remote_rid)
        routes << sub.conn.remote_rid
      else
        deliver_to_subscriber(sub, subject, reply, msg)
      end
    elsif !is_route
      if NATSD::Server.trace_flag?
        trace('Matched queue subscriber', sub[:subject], sub[:qgroup], sub[:sid], sub.conn.client_info)
      end
      # Queue this for post processing
      qsubs ||= Hash.new
      qsubs[sub[:qgroup]] ||= []
      qsubs[sub[:qgroup]] << sub
    end
  end

  return unless qsubs

  qsubs.each_value do |subs|
    # Randomly pick a subscriber from the group
    sub = subs[rand*subs.size]
    if NATSD::Server.trace_flag?
      trace('Selected queue subscriber', sub[:subject], sub[:qgroup], sub[:sid], sub.conn.client_info)
    end
    deliver_to_subscriber(sub, subject, reply, msg)
  end
end

.routed_sid(sub) ⇒ Object



81
82
83
# File 'lib/nats/server/cluster.rb', line 81

def routed_sid(sub)
  "RSID:#{sub.conn.cid}:#{sub.sid}"
end

.rsid_qsub(rsid) ⇒ Object



67
68
69
70
71
72
73
74
# File 'lib/nats/server/cluster.rb', line 67

def rsid_qsub(rsid)
  cid, sid = parse_rsid(rsid)
  conn = Server.connections[cid]
  sub = conn.subscriptions[sid]
  sub if sub.qgroup
rescue
  nil
end

.setup(argv) ⇒ Object



38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
# File 'lib/nats/server/server.rb', line 38

def setup(argv)
  process_options(argv)

  @id, @cid, @rid = fast_uuid, 1, 1
  @sublist = Sublist.new

  @connections = {}
  @num_connections = 0
  @in_msgs = @out_msgs = 0
  @in_bytes = @out_bytes = 0

  @num_routes = 0

  @info = {
    :server_id => Server.id,
    :host => host,
    :port => port,
    :version => VERSION,
    :auth_required => auth_required?,
    :ssl_required => ssl_required?,
    :max_payload => @max_payload
  }

  # Check for daemon flag
  if @options[:daemonize]
    require 'rubygems'
    require 'daemons'
    require 'tmpdir'
    unless @options[:log_file]
      # These log messages visible to controlling TTY
      log "Starting #{NATSD::APP_NAME} version #{NATSD::VERSION} on port #{NATSD::Server.port}"
      log "Starting http monitor on port #{@options[:http_port]}" if @options[:http_port]
      log "Starting routing on port #{@options[:cluster_port]}" if @options[:cluster_port]
      log "Switching to daemon mode"
    end
    opts = {
      :app_name => APP_NAME,
      :mode => :exec,
      :dir_mode => :normal,
      :dir => Dir.tmpdir
    }
    Daemons.daemonize(opts)
    FileUtils.rm_f("#{Dir.tmpdir}/#{APP_NAME}.pid")
  end

  setup_logs
  open_syslog

  # Setup optimized select versions
  EM.epoll unless @options[:noepoll]
  EM.kqueue unless @options[:nokqueue]

  # Write pid file if requested.
  File.open(@options[:pid_file], 'w') { |f| f.puts "#{Process.pid}" } if @options[:pid_file]
end

.setup_logsObject



131
132
133
134
135
136
# File 'lib/nats/server/options.rb', line 131

def setup_logs
  return unless @options[:log_file]
  $stdout.reopen(@options[:log_file], 'a')
  $stdout.sync = true
  $stderr.reopen($stdout)
end

.solicit_routesObject

:nodoc:



47
48
49
50
51
52
53
# File 'lib/nats/server/cluster.rb', line 47

def solicit_routes #:nodoc:
  @opt_routes = []
  NATSD::Server.options[:cluster_routes].each do |r_url|
    opt_routes << { :route => r_url, :uri => URI.parse(r_url), :key => route_key(r_url) }
  end
  try_to_connect_routes
end

.start_http_serverObject

Monitoring



200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
# File 'lib/nats/server/server.rb', line 200

def start_http_server
  return unless port = @options[:http_port]

  require 'thin'

  log "Starting http monitor on port #{port}"

  @healthz = "ok\n"

  @varz = {
    :start => Time.now,
    :options => @options,
    :cores => num_cpu_cores
  }

  http_server = Thin::Server.new(@options[:http_net], port, :signals => false) do
    Thin::Logging.silent = true
    if NATSD::Server.options[:http_user]
      auth = [NATSD::Server.options[:http_user], NATSD::Server.options[:http_password]]
      use Rack::Auth::Basic do |username, password|
        [username, password] == auth
      end
    end
    map '/healthz' do
      run lambda { |env| [200, RACK_TEXT_HDR, NATSD::Server.healthz] }
    end
    map '/varz' do
      run Varz.new
    end
    map '/connz' do
      run Connz.new
    end
  end
  http_server.start!
end

.subscribe(sub, is_route = false) ⇒ Object



94
95
96
97
# File 'lib/nats/server/server.rb', line 94

def subscribe(sub, is_route=false)
  @sublist.insert(sub.subject, sub)
  broadcast_sub_to_routes(sub) unless is_route
end

.symbolize_users(users) ⇒ Object



147
148
149
150
151
152
153
154
# File 'lib/nats/server/options.rb', line 147

def symbolize_users(users)
  return nil unless users
  auth_users = []
  users.each do |u|
    auth_users << { :user => u['user'], :pass => u['pass'] || u['password'] }
  end
  auth_users
end

.try_to_connect_routesObject

:nodoc:



55
56
57
58
59
60
61
# File 'lib/nats/server/cluster.rb', line 55

def try_to_connect_routes #:nodoc:
  opt_routes.each do |route|
    # FIXME, Strip auth
    debug "Trying to connect to route: #{route[:route]}"
    EM.connect(route[:uri].host, route[:uri].port, NATSD::Route, route)
  end
end

.unsubscribe(sub, is_route = false) ⇒ Object



99
100
101
102
# File 'lib/nats/server/server.rb', line 99

def unsubscribe(sub, is_route=false)
  @sublist.remove(sub.subject, sub)
  broadcast_unsub_to_routes(sub) unless is_route
end

.update_varzObject



15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# File 'lib/nats/server/varz.rb', line 15

def update_varz
  # Snapshot uptime
  @varz[:uptime] = uptime_string(Time.now - @varz[:start])

  # Grab current cpu and memory usage.
  rss, pcpu = `ps -o rss=,pcpu= -p #{Process.pid}`.split
  @varz[:mem] = rss.to_i
  @varz[:cpu] = pcpu.to_f
  @varz[:connections] = num_connections
  @varz[:in_msgs] = in_msgs
  @varz[:out_msgs] = out_msgs
  @varz[:in_bytes] = in_bytes
  @varz[:out_bytes] = out_bytes
  @varz[:routes] = num_routes
  @last_varz_update = Time.now.to_f
  varz
end

.versionObject



20
# File 'lib/nats/server/server.rb', line 20

def version; "nats-server version #{NATSD::VERSION}" end