Class: Skein::Client::RPC

Inherits:
Skein::Connected show all
Defined in:
lib/skein/client/rpc.rb

Defined Under Namespace

Classes: RPCException

Constant Summary collapse

EXCHANGE_NAME_DEFAULT =

Constants ============================================================

''.freeze

Instance Attribute Summary

Attributes inherited from Skein::Connected

#connection, #context, #ident

Instance Method Summary collapse

Methods inherited from Skein::Connected

#channel, #connect, #connection_shared?, #create_channel, #lock, #reconnect, #repeat_until_not_nil

Constructor Details

#initialize(exchange_name = nil, routing_key: nil, connection: nil, context: nil, ident: nil, expiration: nil, persistent: true, durable: true, timeout: nil) ⇒ RPC

Instance Methods =====================================================



18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
# File 'lib/skein/client/rpc.rb', line 18

def initialize(exchange_name = nil, routing_key: nil, connection: nil, context: nil, ident: nil, expiration: nil, persistent: true, durable: true, timeout: nil)
  super(connection: connection, context: context, ident: ident)

  @routing_key = routing_key
  @timeout = timeout

  @rpc_exchange = self.channel.direct(
    exchange_name || EXCHANGE_NAME_DEFAULT,
    durable: durable
  )

  @response_queue = self.channel.queue(
    @ident,
    durable: false,
    header: true,
    auto_delete: true
  )
  @expiration = expiration
  @persistent = !!persistent

  @callback = { }

  @consumer = Skein::Adapter.subscribe(@response_queue, block: false) do |payload, delivery_tag, reply_to|
    self.context.trap do
      if (ENV['SKEIN_DEBUG_JSON'])
        $stdout.puts(payload)
      end

      response = JSON.load(payload)

      if (callback = @callback.delete(response['id']))
        if (response['error'])
          exception =
            case (response['error'] and response['error']['code'])
            when -32601
              NoMethodError.new(
                "%s from `%s' RPC call" % [
                  response.dig('error', 'message'),
                  response.dig('error', 'data', 'method')
                ]
              )
            when -32602
              ArgumentError.new(
                response.dig('error', 'data', 'message') || 'wrong number of arguments'
              )
            else
              RPCException.new(
                response.dig('error', 'data', 'message') || response.dig('error', 'message')
              )
            end

          case (callback)
          when Skein::TimeoutQueue
            callback << exception
          when Proc
            callback.call(exception)
          end
        else
          case (callback)
          when Skein::TimeoutQueue
            callback << response['result']
          when Proc
            callback.call(response['result'])
          end
        end
      end

      self.channel.acknowledge(delivery_tag)
    end
  end
end

Dynamic Method Handling

This class handles dynamic methods through the method_missing method

#method_missing(name, *args, &block) ⇒ Object



107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
# File 'lib/skein/client/rpc.rb', line 107

def method_missing(name, *args, &block)
  name = name.to_s

  blocking = !name.sub!(/!\z/, '')

  message_id = SecureRandom.uuid
  request = JSON.dump(
    jsonrpc: '2.0',
    method: name,
    params: args,
    id: message_id
  )

  @rpc_exchange.publish(
    request,
    routing_key: @routing_key,
    reply_to: blocking ? @ident : nil,
    content_type: 'application/json',
    message_id: message_id,
    persistent: @persistent,
    expiration: @expiration
  )

  if (block_given?)
    @callback[message_id] =
      if (defined?(EventMachine))
        EventMachine.next_tick(&block)
      else
        block
      end
  elsif (blocking)
    queue = Skein::TimeoutQueue.new(blocking: true, timeout: @timeout)

    @callback[message_id] = queue

    case (result = queue.pop)
    when Exception
      raise result
    else
      result
    end
  end
end

Instance Method Details

#closeObject



100
101
102
103
104
105
# File 'lib/skein/client/rpc.rb', line 100

def close
  @consumer&.cancel
  @consumer = nil

  super
end

#reroute!(routing_key) ⇒ Object

Temporarily deliver RPC calls to a different routing key. The supplied block is executed with this temporary routing in effect.



92
93
94
95
96
97
98
# File 'lib/skein/client/rpc.rb', line 92

def reroute!(routing_key)
  routing_key, @routing_key = @routing_key, routing_key

  yield if (block_given?)

  @routing_key = routing_key
end