Class: RelpServer
- Inherits:
-
Relp
show all
- Defined in:
- lib/logstash/util/relp.rb
Constant Summary
Constants inherited
from Relp
Relp::RelpSoftware, Relp::RelpVersion
Instance Method Summary
collapse
Methods inherited from Relp
#frame_read, #frame_write, #server?, #valid_command?
Constructor Details
#initialize(host, port, required_commands = [], ssl_context = nil) ⇒ RelpServer
Returns a new instance of RelpServer.
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
|
# File 'lib/logstash/util/relp.rb', line 104
def initialize(host,port,required_commands=[],ssl_context=nil)
@logger = ::Cabin::Channel.get(LogStash)
@server=true
@basic_relp_commands = ['close']
@required_relp_commands = required_commands
begin
@server = TCPServer.new(host, port)
rescue Errno::EADDRINUSE
@logger.error("Could not start RELP server: Address in use",
:host => host, :port => port)
raise
end
if ssl_context
@server = OpenSSL::SSL::SSLServer.new(@server, ssl_context)
end
@logger.info("Started #{ssl_context ? 'SSL-enabled ' : ''}RELP Server", :host => host, :port => port)
end
|
Instance Method Details
#accept ⇒ Object
128
129
130
131
132
|
# File 'lib/logstash/util/relp.rb', line 128
def accept
socket = @server.accept
@logger.debug("New socket created")
return self, socket
end
|
#ack(socket, txnr) ⇒ Object
210
211
212
213
214
215
216
|
# File 'lib/logstash/util/relp.rb', line 210
def ack(socket, txnr)
frame = Hash.new
frame['txnr'] = txnr
frame['command'] = 'rsp'
frame['message'] = '200 OK'
self.frame_write(socket, frame)
end
|
#relp_setup_connection(socket) ⇒ Object
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
|
# File 'lib/logstash/util/relp.rb', line 134
def relp_setup_connection(socket)
frame=self.frame_read(socket)
if frame['command'] == 'open'
offer=Hash[*frame['message'].scan(/^(.*)=(.*)$/).flatten]
if offer['relp_version'].nil?
@logger.warn("No relp version specified")
self.serverclose(socket)
raise RelpError, 'No relp_version specified'
elsif ! (@required_relp_commands - offer['commands'].split(',')).empty?
@logger.warn("Not all required commands are available", :required => @required_relp_commands, :offer => offer['commands'])
response_frame = Hash.new
response_frame['txnr'] = frame['txnr']
response_frame['command'] = 'rsp'
response_frame['message'] = '500 Required command(s) '
+ (@required_relp_commands - offer['commands'].split(',')).join(',')
+ ' not offered'
self.frame_write(socket,response_frame)
self.serverclose(socket)
raise InsufficientCommands, offer['commands']
+ ' offered, require ' + @required_relp_commands.join(',')
else
response_frame = Hash.new
response_frame['txnr'] = frame['txnr']
response_frame['command'] = 'rsp'
response_frame['message'] = '200 OK '
response_frame['message'] += 'relp_version=' + RelpVersion + "\n"
response_frame['message'] += 'relp_software=' + RelpSoftware + "\n"
response_frame['message'] += 'commands=' + @required_relp_commands.join(',') self.frame_write(socket, response_frame)
end
else
self.serverclose(socket)
raise InappropriateCommand, frame['command'] + ' expecting open'
end
end
|
#serverclose(socket) ⇒ Object
195
196
197
198
199
200
201
202
203
204
|
# File 'lib/logstash/util/relp.rb', line 195
def serverclose(socket)
frame = Hash.new
frame['txnr'] = 0
frame['command'] = 'serverclose'
begin
self.frame_write(socket,frame)
socket.close rescue nil
rescue ConnectionClosed
end
end
|
#shutdown ⇒ Object
206
207
208
|
# File 'lib/logstash/util/relp.rb', line 206
def shutdown
@server.close rescue nil
end
|
#syslog_read(socket) ⇒ Object
This does not ack the frame, just reads it
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
|
# File 'lib/logstash/util/relp.rb', line 176
def syslog_read(socket)
frame = self.frame_read(socket)
if frame['command'] == 'syslog'
return frame
elsif frame['command'] == 'close'
response_frame = Hash.new
response_frame['txnr'] = frame['txnr']
response_frame['command'] = 'rsp'
self.frame_write(socket,response_frame)
self.serverclose(socket)
raise ConnectionClosed
else
self.serverclose(socket)
raise InappropriateCommand, frame['command'] + ' expecting syslog'
end
end
|