Class: Arf::Server
- Inherits:
-
Object
- Object
- Arf::Server
- Defined in:
- lib/arf/server.rb
Class Method Summary collapse
Instance Method Summary collapse
- #call_next(index, ctx) ⇒ Object
- #cancel_stream(stream) ⇒ Object
- #default_options ⇒ Object
- #failure(ctx, status, message = nil) ⇒ Object
- #handle_request(ctx) ⇒ Object
- #handle_stream(stream) ⇒ Object
-
#initialize(**opts) ⇒ Server
constructor
A new instance of Server.
- #register_interceptor(callable, &block) ⇒ Object
- #run ⇒ Object
- #shutdown ⇒ Object
Constructor Details
#initialize(**opts) ⇒ Server
Returns a new instance of Server.
20 21 22 23 24 25 26 27 28 |
# File 'lib/arf/server.rb', line 20 def initialize(**opts) opts = .merge(opts) opts[:id_generator] ||= Server.pseudo_uuid @logger = opts[:logger] @id_generator = opts[:id_generator] @max_concurrent_streams = opts[:max_concurrent_streams] @server = Arf::Wire::Server.new(self) @interceptors = [] end |
Class Method Details
.pseudo_uuid ⇒ Object
5 6 7 8 9 10 |
# File 'lib/arf/server.rb', line 5 def self.pseudo_uuid hostname = Socket.gethostname lambda do "#{hostname}-#{SecureRandom.hex(8)}" end end |
Instance Method Details
#call_next(index, ctx) ⇒ Object
117 118 119 120 121 |
# File 'lib/arf/server.rb', line 117 def call_next(index, ctx) return unless index < @interceptors.size @interceptors[index].call(ctx) { call_next(index + 1, ctx) } end |
#cancel_stream(stream) ⇒ Object
115 |
# File 'lib/arf/server.rb', line 115 def cancel_stream(stream); end |
#default_options ⇒ Object
12 13 14 15 16 17 18 |
# File 'lib/arf/server.rb', line 12 def @default_options ||= { max_concurrent_streams: 0, logger: Arf.configuration.logger, id_generator: Server.pseudo_uuid }.merge end |
#failure(ctx, status, message = nil) ⇒ Object
104 105 106 107 108 109 110 111 112 113 |
# File 'lib/arf/server.rb', line 104 def failure(ctx, status, = nil) resp = RPC::Response.new( status: Status::FROM_SYMBOL[status], metadata: { "arf-status-description" => || Status::STATUS_TEXT[status] } ) ctx.stream.write_data(RPC::BaseMessage.encode(resp), end_stream: true) ctx.has_sent_response = true end |
#handle_request(ctx) ⇒ Object
66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 |
# File 'lib/arf/server.rb', line 66 def handle_request(ctx) base_svc = Arf::RPC::ServiceBase.by_id(ctx.request.service) svc = base_svc&.subclasses&.first if svc.nil? ctx.log.info("Rejecting request as there's no service registered with the requested name", service: ctx.request.service, method: ctx.request.method) failure(ctx, :unimplemented) return false end unless svc.respond_to_rpc?(ctx.request.method) ctx.log.info("Rejecting request as service does not respond to the requested method", service: ctx.request.service, method: ctx.request.method) failure(ctx, :unimplemented) return false end ctx.log.debug("Handler got request") svc_inst = svc.new begin result = svc_inst.arf_execute_request(ctx) rescue Status::BadStatus => e failure(ctx, e.code, e.) return false rescue Exception => e ctx.log.error("Failed invoking method #{ctx.request.method}", e) failure(ctx, :internal_error) return false end ctx.end_send if ctx.has_send_stream svc_inst.respond(result) unless ctx.has_sent_response true end |
#handle_stream(stream) ⇒ Object
38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 |
# File 'lib/arf/server.rb', line 38 def handle_stream(stream) req_id = @id_generator.call stream.external_id = req_id log = @logger.with_fields(request_id: req_id) log.debug("Servicing stream") ctx = Context.new(req_id, log, stream) log.debug("Reading request...", thread: Thread.current.name) msg = RPC::BaseMessage.initialize_from(stream.read_blocking) unless msg.is_a? RPC::Request log.info("Rejecting stream as it does not start with a Request frame") stream.write_data(RPC::BaseMessage.encode(RPC::Response.new( status: :failed_precondition, metadata: { "arf-request-id" => req_id, "arf-status-description" => "Missing Request frame" } ))) stream.close_local return end log.debug("Request read OK") ctx.request = msg # chain interceptors call_next(0, ctx) handle_request(ctx) end |
#register_interceptor(callable, &block) ⇒ Object
30 31 32 33 |
# File 'lib/arf/server.rb', line 30 def register_interceptor(callable, &block) callable ||= block @interceptors << callable end |
#run ⇒ Object
35 |
# File 'lib/arf/server.rb', line 35 def run = @server.run |
#shutdown ⇒ Object
36 |
# File 'lib/arf/server.rb', line 36 def shutdown = @server.shutdown |