Class: Skein::Client::RPC
- Inherits:
-
Skein::Connected
- Object
- Skein::Connected
- Skein::Client::RPC
- Defined in:
- lib/skein/client/rpc.rb
Defined Under Namespace
Classes: RPCException
Constant Summary collapse
- EXCHANGE_NAME_DEFAULT =
Constants ============================================================
''.freeze
Instance Attribute Summary
Attributes inherited from Skein::Connected
Instance Method Summary collapse
- #close ⇒ Object
-
#initialize(exchange_name = nil, routing_key: nil, connection: nil, context: nil, ident: nil, expiration: nil, persistent: true, durable: true, timeout: nil) ⇒ RPC
constructor
Instance Methods =====================================================.
- #method_missing(name, *args, &block) ⇒ Object
-
#reroute!(routing_key) ⇒ Object
Temporarily deliver RPC calls to a different routing key.
Methods inherited from Skein::Connected
#channel, #connect, #connection_shared?, #create_channel, #lock, #reconnect, #repeat_until_not_nil
Constructor Details
#initialize(exchange_name = nil, routing_key: nil, connection: nil, context: nil, ident: nil, expiration: nil, persistent: true, durable: true, timeout: nil) ⇒ RPC
Instance Methods =====================================================
18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 |
# File 'lib/skein/client/rpc.rb', line 18 def initialize(exchange_name = nil, routing_key: nil, connection: nil, context: nil, ident: nil, expiration: nil, persistent: true, durable: true, timeout: nil) super(connection: connection, context: context, ident: ident) @routing_key = routing_key @timeout = timeout @rpc_exchange = self.channel.direct( exchange_name || EXCHANGE_NAME_DEFAULT, durable: durable ) @response_queue = self.channel.queue( @ident, durable: false, header: true, auto_delete: true ) @expiration = expiration @persistent = !!persistent @callback = { } @consumer = Skein::Adapter.subscribe(@response_queue, block: false) do |payload, delivery_tag, reply_to| self.context.trap do if (ENV['SKEIN_DEBUG_JSON']) $stdout.puts(payload) end response = JSON.load(payload) if (callback = @callback.delete(response['id'])) if (response['error']) exception = case (response['error'] and response['error']['code']) when -32601 NoMethodError.new( "%s from `%s' RPC call" % [ response.dig('error', 'message'), response.dig('error', 'data', 'method') ] ) when -32602 ArgumentError.new( response.dig('error', 'data', 'message') || 'wrong number of arguments' ) else RPCException.new( response.dig('error', 'data', 'message') || response.dig('error', 'message') ) end case (callback) when Skein::TimeoutQueue callback << exception when Proc callback.call(exception) end else case (callback) when Skein::TimeoutQueue callback << response['result'] when Proc callback.call(response['result']) end end end self.channel.acknowledge(delivery_tag) end end end |
Dynamic Method Handling
This class handles dynamic methods through the method_missing method
#method_missing(name, *args, &block) ⇒ Object
107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 |
# File 'lib/skein/client/rpc.rb', line 107 def method_missing(name, *args, &block) name = name.to_s blocking = !name.sub!(/!\z/, '') = SecureRandom.uuid request = JSON.dump( jsonrpc: '2.0', method: name, params: args, id: ) @rpc_exchange.publish( request, routing_key: @routing_key, reply_to: blocking ? @ident : nil, content_type: 'application/json', message_id: , persistent: @persistent, expiration: @expiration ) if (block_given?) @callback[] = if (defined?(EventMachine)) EventMachine.next_tick(&block) else block end elsif (blocking) queue = Skein::TimeoutQueue.new(blocking: true, timeout: @timeout) @callback[] = queue case (result = queue.pop) when Exception raise result else result end end end |
Instance Method Details
#close ⇒ Object
100 101 102 103 104 105 |
# File 'lib/skein/client/rpc.rb', line 100 def close @consumer&.cancel @consumer = nil super end |
#reroute!(routing_key) ⇒ Object
Temporarily deliver RPC calls to a different routing key. The supplied block is executed with this temporary routing in effect.
92 93 94 95 96 97 98 |
# File 'lib/skein/client/rpc.rb', line 92 def reroute!(routing_key) routing_key, @routing_key = @routing_key, routing_key yield if (block_given?) @routing_key = routing_key end |