Class: Arf::RPC::ServiceBase

Inherits:
Object
  • Object
show all
Includes:
Types::Mixin
Defined in:
lib/arf/rpc/service_base.rb

Defined Under Namespace

Classes: YieldOnClosedStream, YieldWithoutRespondError, YieldWithoutStreamError

Constant Summary

Constants included from Types::Mixin

Types::Mixin::ArrayType, Types::Mixin::InOutStream, Types::Mixin::InputStream, Types::Mixin::MapType, Types::Mixin::OutputStream, Types::Mixin::Streamer

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Types::Mixin

included

Instance Attribute Details

#logObject (readonly)

Returns the value of attribute log.



74
75
76
# File 'lib/arf/rpc/service_base.rb', line 74

def log
  @log
end

#requestObject (readonly)

Returns the value of attribute request.



74
75
76
# File 'lib/arf/rpc/service_base.rb', line 74

def request
  @request
end

Class Method Details

.arf_service_id(id) ⇒ Object



39
40
41
42
# File 'lib/arf/rpc/service_base.rb', line 39

def self.arf_service_id(id)
  @arf_service_id = id
  ::Arf::RPC::ServiceBase.register(id, self)
end

.by_id(id) ⇒ Object



34
35
36
37
# File 'lib/arf/rpc/service_base.rb', line 34

def self.by_id(id)
  @services ||= {}
  @services[id]
end

.register(id, cls) ⇒ Object



29
30
31
32
# File 'lib/arf/rpc/service_base.rb', line 29

def self.register(id, cls)
  @services ||= {}
  @services[id] = cls
end

.respond_to_rpc?(name) ⇒ Boolean



44
45
46
47
48
49
50
51
52
# File 'lib/arf/rpc/service_base.rb', line 44

def self.respond_to_rpc?(name)
  return true if @service_methods&.key?(name.to_sym)

  if superclass != Arf::RPC::ServiceBase && superclass.respond_to?(:respond_to_rpc?)
    return superclass.respond_to_rpc?(name)
  end

  false
end

.rpc(name, inputs: nil, outputs: nil) ⇒ Object



65
66
67
68
69
70
71
72
# File 'lib/arf/rpc/service_base.rb', line 65

def self.rpc(name, inputs: nil, outputs: nil)
  inputs ||= {}
  outputs = [outputs].compact unless outputs.is_a? Array

  @service_methods ||= {}
  @service_methods[name] = MethodMeta.new(inputs, outputs)
  define_method(name) { unimplemented! }
end

.rpc_method_meta(name) ⇒ Object



54
55
56
57
58
59
60
61
62
63
# File 'lib/arf/rpc/service_base.rb', line 54

def self.rpc_method_meta(name)
  name = name.to_sym unless name.is_a? Symbol
  return @service_methods[name] if @service_methods&.key?(name)

  if superclass != Arf::RPC::ServiceBase && superclass.respond_to?(:rpc_method_meta)
    return superclass.rpc_method_meta(name)
  end

  nil
end

Instance Method Details

#add_meta(k, v) ⇒ Object



123
# File 'lib/arf/rpc/service_base.rb', line 123

def add_meta(k, v) = @context.response..add(k, v)

#arf_execute_request(ctx) ⇒ Object



76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
# File 'lib/arf/rpc/service_base.rb', line 76

def arf_execute_request(ctx)
  @context = ctx
  @request = ctx.request
  @metadata = @request.
  @service = @request.service
  @method = @request.method
  @log = ctx.log
  @method_meta = self.class.rpc_method_meta(@method)
  needs_response = @method_meta.output?
  @context.prepare
  if (str = @method_meta.output_stream)
    @context.has_send_stream = true
    @context.send_stream_type = str
  end

  @context.response..attach_observer(self)

  output_type = @method_meta.output_stream&.resolved_type(self.class)

  result = nil
  begin
    result = send(@method, *@request.params) do |val|
      raise YieldWithoutStreamError unless ctx.has_send_stream
      raise YieldWithoutRespondError if !ctx.has_sent_response && needs_response
      raise YieldOnClosedStream if ctx.send_stream_finished

      ctx.stream_send(Arf::Types.coerce_value(val, output_type))
      nil
    end
  rescue Exception => e
    @log.error("Failed executing ##{@method}", e)
    error!
  end
  @log.debug("Finished executing request", method: @method, result:)
  result
end

#error!Object

Raises:



134
# File 'lib/arf/rpc/service_base.rb', line 134

def error! = raise Status::BadStatus, :internal_error

#observer_changed(_observer) ⇒ Object



113
114
115
116
117
118
# File 'lib/arf/rpc/service_base.rb', line 113

def observer_changed(_observer)
  # We just have a single observer, it will be response metadata.
  return unless ctx.send_stream_started && !ctx.send_stream_finished

  ctx.
end

#recvObject



120
# File 'lib/arf/rpc/service_base.rb', line 120

def recv = @context.recv

#respond(value, code: nil) ⇒ Object



125
126
127
128
129
130
131
# File 'lib/arf/rpc/service_base.rb', line 125

def respond(value, code: nil)
  code ||= :ok

  @context.response.params = @method_meta.coerce_result(value, self.class)
  @context.response.status = code
  @context.send_response
end

#set_meta(k, v) ⇒ Object



122
# File 'lib/arf/rpc/service_base.rb', line 122

def set_meta(k, v) = @context.response..set(k, v)

#unimplemented!Object

Raises:



133
# File 'lib/arf/rpc/service_base.rb', line 133

def unimplemented! = raise Status::BadStatus, :unimplemented