Class: Async::Bus::Protocol::Wrapper

Inherits:
MessagePack::Factory
  • Object
show all
Defined in:
lib/async/bus/protocol/wrapper.rb

Overview

Represents a MessagePack factory wrapper for async-bus serialization.

Instance Method Summary collapse

Constructor Details

#initialize(connection, reference_types: [Controller]) ⇒ Wrapper

Initialize a new wrapper.



23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
# File 'lib/async/bus/protocol/wrapper.rb', line 23

def initialize(connection, reference_types: [Controller])
  super()
  
  @connection = connection
  @reference_types = reference_types
  
  # Store the peer connection for forwarding proxies:
  # When a proxy is forwarded (local=false), it should point back to the sender
  # (the peer connection), not the receiver (this connection).
  @peer_connection = nil
  
  # The order here matters.
  
  self.register_type(0x00, Invoke, recursive: true,
      packer: ->(invoke, packer){invoke.pack(packer)},
      unpacker: ->(unpacker){Invoke.unpack(unpacker)},
    )
  
  [Return, Yield, Error, Next, Throw, Close].each_with_index do |klass, index|
    self.register_type(0x01 + index, klass, recursive: true,
        packer: ->(value, packer){value.pack(packer)},
        unpacker: ->(unpacker){klass.unpack(unpacker)},
      )
  end
  
  # Reverse serialize proxies back into proxies:
  # When a Proxy is received, use proxy_object to handle reverse lookup
  self.register_type(0x10, Proxy, recursive: true,
      packer: self.method(:pack_proxy),
      unpacker: self.method(:unpack_proxy),
    )
  
  self.register_type(0x11, Release, recursive: true,
      packer: ->(release, packer){release.pack(packer)},
      unpacker: ->(unpacker){Release.unpack(unpacker)},
    )
  
  self.register_type(0x20, Symbol)
  self.register_type(0x21, Exception, recursive: true,
      packer: self.method(:pack_exception),
      unpacker: self.method(:unpack_exception),
    )
  
  self.register_type(0x22, Class,
      packer: ->(klass){klass.name},
      unpacker: ->(name){Object.const_get(name)},
    )
  
  reference_packer = self.method(:pack_reference)
  reference_unpacker = self.method(:unpack_reference)
  
  # Serialize objects into proxies:
  reference_types&.each_with_index do |klass, index|
    self.register_type(0x30 + index, klass, recursive: true,
        packer: reference_packer,
        unpacker: reference_unpacker,
      )
  end
end

Instance Method Details

#pack_exception(exception, packer) ⇒ Object

Pack an exception into a MessagePack packer.



117
118
119
120
121
# File 'lib/async/bus/protocol/wrapper.rb', line 117

def pack_exception(exception, packer)
  packer.write(exception.class.name)
  packer.write(exception.message)
  packer.write(exception.backtrace)
end

#pack_proxy(proxy, packer) ⇒ Object

Pack a proxy into a MessagePack packer.

Validates that the proxy is for this connection and serializes the proxy name. Multi-hop proxy forwarding is not supported, so proxies can only be serialized from the same connection they were created for (round-trip scenarios).



92
93
94
95
96
97
98
99
# File 'lib/async/bus/protocol/wrapper.rb', line 92

def pack_proxy(proxy, packer)
  # Check if the proxy is for this connection:
  if proxy.__connection__ != @connection
    proxy = @connection.proxy(proxy)
  end
  
  packer.write(proxy.__name__)
end

#pack_reference(object, packer) ⇒ Object

Pack a reference type object (e.g., Controller) into a MessagePack packer.

Serializes the object as a proxy by generating a temporary name and writing it to the packer. The object is implicitly bound to the connection with a temporary name.



146
147
148
# File 'lib/async/bus/protocol/wrapper.rb', line 146

def pack_reference(object, packer)
  packer.write(@connection.proxy_name(object))
end

#unpack_exception(unpacker) ⇒ Object

Unpack an exception from a MessagePack unpacker.



126
127
128
129
130
131
132
133
134
135
136
137
# File 'lib/async/bus/protocol/wrapper.rb', line 126

def unpack_exception(unpacker)
  klass = unpacker.read
  message = unpacker.read
  backtrace = unpacker.read
  
  klass = Object.const_get(klass)
  
  exception = klass.new(message)
  exception.set_backtrace(backtrace)
  
  return exception
end

#unpack_proxy(unpacker) ⇒ Object

Unpack a proxy from a MessagePack unpacker.

When deserializing a proxy:

  • If the object is bound locally, return the actual object (round-trip scenario)

  • If the object is not found locally, create a proxy pointing to this connection (the proxy was forwarded from another connection and should point back to the sender)



110
111
112
# File 'lib/async/bus/protocol/wrapper.rb', line 110

def unpack_proxy(unpacker)
  @connection.proxy_object(unpacker.read)
end

#unpack_reference(unpacker) ⇒ Object

Unpack a reference type object from a MessagePack unpacker.

Reads a proxy name and returns the corresponding object or proxy. If the object is bound locally, returns the actual object; otherwise returns a proxy.



157
158
159
# File 'lib/async/bus/protocol/wrapper.rb', line 157

def unpack_reference(unpacker)
  @connection.proxy_object(unpacker.read)
end