Class: Toq::Client

Inherits:
Object show all
Defined in:
lib/toq/client.rb,
lib/toq/client/handler.rb

Overview

Simple RPC client capable of:

  • TLS encryption.

  • Asynchronous and synchronous requests.

  • Handling remote asynchronous calls that defer their result.

Author:

Defined Under Namespace

Classes: Handler

Constant Summary collapse

DEFAULT_CONNECTION_POOL_SIZE =

Default amount of connections to maintain in the re-use pool.

1

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(opts) ⇒ Client

Returns a new instance of Client.

Examples:

Example options:


{
    :host  => 'localhost',
    :port  => 7331,

    # optional authentication token, if it doesn't match the one
    # set on the server-side you'll be getting exceptions.
    :token => 'superdupersecret',

    :serializer => Marshal,

    :max_retries => 0,

    # In order to enable peer verification one must first provide
    # the following:
    # SSL CA certificate
    :ssl_ca     => cwd + '/../spec/pems/cacert.pem',
    # SSL private key
    :ssl_pkey   => cwd + '/../spec/pems/client/key.pem',
    # SSL certificate
    :ssl_cert   => cwd + '/../spec/pems/client/cert.pem'
}

Parameters:

  • opts (Hash)

Options Hash (opts):

  • :host (String)

    Hostname/IP address.

  • :port (Integer)

    Port number.

  • :socket (String)

    Path to UNIX domain socket.

  • :connection_pool_size (Integer) — default: 1

    Amount of connections to keep open.

  • :token (String)

    Optional authentication token.

  • :serializer (.dump, .load) — default: YAML

    Serializer to use for message transmission.

  • :max_retries (Integer)

    How many times to retry failed requests.

  • :ssl_ca (String)

    SSL CA certificate.

  • :ssl_pkey (String)

    SSL private key.

  • :ssl_cert (String)

    SSL certificate.



73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
# File 'lib/toq/client.rb', line 73

def initialize( opts )
    @opts  = opts.merge( role: :client )
    @token = @opts[:token]

    @host, @port = @opts[:host], @opts[:port].to_i
    @socket = @opts[:socket]

    if !@socket && !(@host || @port)
        fail ArgumentError, 'Needs either a :socket or :host and :port options.'
    end

    @port = @port.to_i

    if @host && @port <= 0
        fail ArgumentError, "Invalid port: #{@port}"
    end

    @pool_size = @opts[:connection_pool_size] || DEFAULT_CONNECTION_POOL_SIZE

    @reactor = Raktr.new

    @connections      = @reactor.create_queue
    @connection_count = 0
end

Instance Attribute Details

#connection_countInteger (readonly)

Returns Amount of connections in the pool.

Returns:

  • (Integer)

    Amount of connections in the pool.



33
34
35
# File 'lib/toq/client.rb', line 33

def connection_count
  @connection_count
end

#optsHash (readonly)

Returns Options hash.

Returns:

  • (Hash)

    Options hash.



29
30
31
# File 'lib/toq/client.rb', line 29

def opts
  @opts
end

#reactorObject (readonly)

Returns the value of attribute reactor.



25
26
27
# File 'lib/toq/client.rb', line 25

def reactor
  @reactor
end

Instance Method Details

#call(msg, *args, &block) ⇒ Object

Calls a remote method and grabs the result.

There are 2 ways to perform a call, async (non-blocking) and sync (blocking).

Examples:

To perform an async call you need to provide a block to handle the result.


server.call( 'handler.method', arg1, arg2 ) do |res|
    do_stuff( res )
end

To perform a sync (blocking), call without a block.


res = server.call( 'handler.method', arg1, arg2 )

Parameters:

  • msg (String)

    RPC message in the form of ‘handler.method`.

  • args (Array)

    Collection of arguments to be passed to the method.

  • block (Block)


187
188
189
190
191
192
193
194
195
196
197
198
# File 'lib/toq/client.rb', line 187

def call( msg, *args, &block )
    ensure_reactor_running

    req = Request.new(
        message:  msg,
        args:     args,
        callback: block,
        token:    @token
    )

    block_given? ? call_async( req ) : call_sync( req )
end

#closeObject

Close all connections.



136
137
138
139
140
141
142
143
# File 'lib/toq/client.rb', line 136

def close
    ensure_reactor_running

    @reactor.on_tick do |task|
        @connections.pop(&:close_without_retry)
        task.done if @connections.empty?
    end
end

#connect(&block) ⇒ Boolean

Connection factory, will re-use or create new connections as needed to accommodate the workload.

Parameters:

  • block (Block)

    Block to be passed a connection.

Returns:

  • (Boolean)

    ‘true` if a new connection had to be established, `false` if an existing one was re-used.



107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
# File 'lib/toq/client.rb', line 107

def connect( &block )
    ensure_reactor_running

    if @connections.empty? && @connection_count < @pool_size
        opts = @socket ? @socket : [@host, @port]
        block.call @reactor.connect( *[opts, Handler, @opts.merge( client: self )].flatten )
        increment_connection_counter
        return true
    end

    pop_block = proc do |conn|
        # Some connections may have died while they were waiting in the
        # queue, get rid of them and start all over in case the queue has
        # been emptied.
        if !conn.done?
            connection_failed conn
            connect( &block )
            next
        end

        block.call conn
    end

    @connections.pop( &pop_block )

    false
end

#connection_failed(connection) ⇒ Object

Handles failed connections.

Parameters:



161
162
163
164
165
166
# File 'lib/toq/client.rb', line 161

def connection_failed( connection )
    ensure_reactor_running

    @connection_count -= 1
    connection.close_without_retry
end

#increment_connection_counterObject



145
146
147
# File 'lib/toq/client.rb', line 145

def increment_connection_counter
    @connection_count += 1
end

#push_connection(connection) ⇒ Object

Finished Handlers push themselves here to be re-used.

Parameters:



152
153
154
155
156
# File 'lib/toq/client.rb', line 152

def push_connection( connection )
    ensure_reactor_running

    @connections << connection
end