Class: Arf::Server

Inherits:
Object
  • Object
show all
Defined in:
lib/arf/server.rb

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(**opts) ⇒ Server

Returns a new instance of Server.



20
21
22
23
24
25
26
27
28
# File 'lib/arf/server.rb', line 20

def initialize(**opts)
  opts = default_options.merge(opts)
  opts[:id_generator] ||= Server.pseudo_uuid
  @logger = opts[:logger]
  @id_generator = opts[:id_generator]
  @max_concurrent_streams = opts[:max_concurrent_streams]
  @server = Arf::Wire::Server.new(self)
  @interceptors = []
end

Class Method Details

.pseudo_uuidObject



5
6
7
8
9
10
# File 'lib/arf/server.rb', line 5

def self.pseudo_uuid
  hostname = Socket.gethostname
  lambda do
    "#{hostname}-#{SecureRandom.hex(8)}"
  end
end

Instance Method Details

#call_next(index, ctx) ⇒ Object



117
118
119
120
121
# File 'lib/arf/server.rb', line 117

def call_next(index, ctx)
  return unless index < @interceptors.size

  @interceptors[index].call(ctx) { call_next(index + 1, ctx) }
end

#cancel_stream(stream) ⇒ Object



115
# File 'lib/arf/server.rb', line 115

def cancel_stream(stream); end

#default_optionsObject



12
13
14
15
16
17
18
# File 'lib/arf/server.rb', line 12

def default_options
  @default_options ||= {
    max_concurrent_streams: 0,
    logger: Arf.configuration.logger,
    id_generator: Server.pseudo_uuid
  }.merge
end

#failure(ctx, status, message = nil) ⇒ Object



104
105
106
107
108
109
110
111
112
113
# File 'lib/arf/server.rb', line 104

def failure(ctx, status, message = nil)
  resp = RPC::Response.new(
    status: Status::FROM_SYMBOL[status],
    metadata: {
      "arf-status-description" => message || Status::STATUS_TEXT[status]
    }
  )
  ctx.stream.write_data(RPC::BaseMessage.encode(resp), end_stream: true)
  ctx.has_sent_response = true
end

#handle_request(ctx) ⇒ Object



66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
# File 'lib/arf/server.rb', line 66

def handle_request(ctx)
  base_svc = Arf::RPC::ServiceBase.by_id(ctx.request.service)
  svc = base_svc&.subclasses&.first

  if svc.nil?
    ctx.log.info("Rejecting request as there's no service registered with the requested name",
                 service: ctx.request.service, method: ctx.request.method)
    failure(ctx, :unimplemented)
    return false
  end

  unless svc.respond_to_rpc?(ctx.request.method)
    ctx.log.info("Rejecting request as service does not respond to the requested method",
                 service: ctx.request.service, method: ctx.request.method)
    failure(ctx, :unimplemented)
    return false
  end

  ctx.log.debug("Handler got request")

  svc_inst = svc.new

  begin
    result = svc_inst.arf_execute_request(ctx)
  rescue Status::BadStatus => e
    failure(ctx, e.code, e.message)
    return false
  rescue Exception => e
    ctx.log.error("Failed invoking method #{ctx.request.method}", e)
    failure(ctx, :internal_error)
    return false
  end

  ctx.end_send if ctx.has_send_stream
  svc_inst.respond(result) unless ctx.has_sent_response
  true
end

#handle_stream(stream) ⇒ Object



38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# File 'lib/arf/server.rb', line 38

def handle_stream(stream)
  req_id = @id_generator.call
  stream.external_id = req_id
  log = @logger.with_fields(request_id: req_id)
  log.debug("Servicing stream")
  ctx = Context.new(req_id, log, stream)
  log.debug("Reading request...", thread: Thread.current.name)
  msg = RPC::BaseMessage.initialize_from(stream.read_blocking)
  unless msg.is_a? RPC::Request
    log.info("Rejecting stream as it does not start with a Request frame")
    stream.write_data(RPC::BaseMessage.encode(RPC::Response.new(
                                                status: :failed_precondition,
                                                metadata: {
                                                  "arf-request-id" => req_id,
                                                  "arf-status-description" => "Missing Request frame"
                                                }
                                              )))
    stream.close_local
    return
  end

  log.debug("Request read OK")
  ctx.request = msg
  # chain interceptors
  call_next(0, ctx)
  handle_request(ctx)
end

#register_interceptor(callable, &block) ⇒ Object



30
31
32
33
# File 'lib/arf/server.rb', line 30

def register_interceptor(callable, &block)
  callable ||= block
  @interceptors << callable
end

#runObject



35
# File 'lib/arf/server.rb', line 35

def run = @server.run

#shutdownObject



36
# File 'lib/arf/server.rb', line 36

def shutdown = @server.shutdown