Class: Roby::Interface::V2::Client

Inherits:
Object
  • Object
show all
Defined in:
lib/roby/interface/v2/client.rb

Overview

The client-side object that allows to access an interface (e.g. a Roby app) from another process than the Roby controller

Defined Under Namespace

Classes: BatchContext, Job, NoSuchAction, RemoteError, TimeoutError

Constant Summary collapse

DEFAULT_CALL_TIMEOUT =

Default value for #call_timeout

10

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(io, id, handshake: %i[actions commands])) ⇒ Client

Create a client endpoint to a Roby interface [Server]

Parameters:

  • io (Channel)

    a channel to the server

  • id (String)

    a unique identifier for this client (e.g. host:port of the local endpoint when using TCP). It is passed to the server through Server#handshake

  • handshake (Array<Symbol>) (defaults to: %i[actions commands]))

    commands executed on the server side during the handshake and stored in the #handshake_results attribute. Include :actions and :commands if you pass this explicitely, unless you know what you are doing

See Also:



69
70
71
72
73
74
75
76
77
78
79
80
81
82
# File 'lib/roby/interface/v2/client.rb', line 69

def initialize(io, id, handshake: %i[actions commands])
    @pending_async_calls = []
    @io = io
    @message_id = 0
    @notification_queue = []
    @job_progress_queue = []
    @exception_queue = []
    @ui_event_queue = []
    @call_timeout = DEFAULT_CALL_TIMEOUT

    @handshake_results = call([], :handshake, id, handshake)
    @actions = @handshake_results[:actions]
    @commands = @handshake_results[:commands]
end

Dynamic Method Handling

This class handles dynamic methods through the method_missing method

#method_missing(m, *args, **keywords, &b) ⇒ Object

rubocop:disable Style/MethodMissingSuper



631
632
633
634
635
636
637
638
639
# File 'lib/roby/interface/v2/client.rb', line 631

def method_missing(m, *args, **keywords, &b) # rubocop:disable Style/MethodMissingSuper
    if (sub = find_subcommand_by_name(m.to_s))
        SubcommandClient.new(self, m.to_s, sub.description, sub.commands)
    elsif (match = /^async_(.*)$/.match(m.to_s))
        async_call([], match[1].to_sym, *args, **keywords, &b)
    else
        call([], m, *args, **keywords)
    end
end

Instance Attribute Details

#actionsArray<Roby::Actions::Model::Action> (readonly)

Returns set of known actions.

Returns:

  • (Array<Roby::Actions::Model::Action>)

    set of known actions



17
18
19
# File 'lib/roby/interface/v2/client.rb', line 17

def actions
  @actions
end

#call_timeoutFloat

Timeout, in seconds, in blocking remote calls

Defaults to DEFAULT_CALL_TIMEOUT

Returns:

  • (Float)


55
56
57
# File 'lib/roby/interface/v2/client.rb', line 55

def call_timeout
  @call_timeout
end

#commandsHash (readonly)

Returns the set of available commands.

Returns:

  • (Hash)

    the set of available commands



19
20
21
# File 'lib/roby/interface/v2/client.rb', line 19

def commands
  @commands
end

#cycle_indexInteger (readonly)

Returns index of the last processed cycle.

Returns:

  • (Integer)

    index of the last processed cycle



39
40
41
# File 'lib/roby/interface/v2/client.rb', line 39

def cycle_index
  @cycle_index
end

#cycle_start_timeTime (readonly)

Returns time of the last processed cycle.

Returns:

  • (Time)

    time of the last processed cycle



41
42
43
# File 'lib/roby/interface/v2/client.rb', line 41

def cycle_start_time
  @cycle_start_time
end

#exception_queueArray<Integer,Array> (readonly)

Returns list of existing exceptions. The integer is an ID that can be used to refer to the exception. It is always growing and will never collide with a notification ID.

Returns:

  • (Array<Integer,Array>)

    list of existing exceptions. The integer is an ID that can be used to refer to the exception. It is always growing and will never collide with a notification ID



32
33
34
# File 'lib/roby/interface/v2/client.rb', line 32

def exception_queue
  @exception_queue
end

#handshake_resultsHash<Symbol,Object> (readonly)

Result of the calls done during the handshake

Returns:



48
49
50
# File 'lib/roby/interface/v2/client.rb', line 48

def handshake_results
  @handshake_results
end

#ioChannel (readonly)

Returns the IO to the server.

Returns:

  • (Channel)

    the IO to the server



15
16
17
# File 'lib/roby/interface/v2/client.rb', line 15

def io
  @io
end

#job_progress_queueArray<Integer,Array> (readonly)

Returns list of existing job progress information. The integer is an ID that can be used to refer to the job progress information. It is always growing and will never collide with a job progress and exception ID.

Returns:

  • (Array<Integer,Array>)

    list of existing job progress information. The integer is an ID that can be used to refer to the job progress information. It is always growing and will never collide with a job progress and exception ID



24
25
26
# File 'lib/roby/interface/v2/client.rb', line 24

def job_progress_queue
  @job_progress_queue
end

#notification_queueArray<Integer,Array> (readonly)

Returns list of existing notifications. The integer is an ID that can be used to refer to the notification. It is always growing and will never collide with an exception ID.

Returns:

  • (Array<Integer,Array>)

    list of existing notifications. The integer is an ID that can be used to refer to the notification. It is always growing and will never collide with an exception ID



28
29
30
# File 'lib/roby/interface/v2/client.rb', line 28

def notification_queue
  @notification_queue
end

#pending_async_callsArray<Hash> (readonly)

Returns list of the pending async calls.

Returns:

  • (Array<Hash>)

    list of the pending async calls



43
44
45
# File 'lib/roby/interface/v2/client.rb', line 43

def pending_async_calls
  @pending_async_calls
end

#ui_event_queueArray<Integer,Array> (readonly)

Returns list of queued UI events. The integer is an ID that can be used to refer to the exception. It is always growing and will never collide with a notification ID.

Returns:

  • (Array<Integer,Array>)

    list of queued UI events. The integer is an ID that can be used to refer to the exception. It is always growing and will never collide with a notification ID



36
37
38
# File 'lib/roby/interface/v2/client.rb', line 36

def ui_event_queue
  @ui_event_queue
end

Instance Method Details

#allocate_message_idObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Allocation of unique IDs for notification messages



231
232
233
# File 'lib/roby/interface/v2/client.rb', line 231

def allocate_message_id
    @message_id += 1
end

#async_call(path, m, *args, **keywords, &block) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Asynchronously call a method on the interface or on one of the interface’s subcommands

Parameters:

  • path (Array<String>)

    path to the subcommand. Empty means on the interface object itself.

  • m (Symbol)

    command or action name. Actions are always formatted as action_name!

  • args (Object)

    the command or action arguments

Returns:

  • (Object)

    an Object associated with the call

See Also:



385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
# File 'lib/roby/interface/v2/client.rb', line 385

def async_call(path, m, *args, **keywords, &block)
    raise "no callback block given" unless block_given?

    if (action_match = /(.*)!$/.match(m.to_s))
        action_name = action_match[1]
        unless find_action_by_name(action_name)
            raise NoSuchAction,
                  "there is no action called #{action_name} on #{self}"
        end

        path = []
        m = :start_job
        args = [action_name, *args]
    end
    io.write_packet([path, m, args, keywords])
    pending_async_calls << { block: block, path: path, m: m, args: args }
    pending_async_calls.last.freeze
end

#async_call_pending?(a_call) ⇒ Boolean

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Whether the async call is still pending

Parameters:

  • call (Object)

    the Object associated with the call

Returns:

  • (Boolean)

    true if the async call is pending, false otherwise



410
411
412
# File 'lib/roby/interface/v2/client.rb', line 410

def async_call_pending?(a_call)
    pending_async_calls.any? { |item| item.equal?(a_call) }
end

#call(path, m, *args, **keywords) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Call a method on the interface or on one of the interface’s subcommands

Parameters:

  • path (Array<String>)

    path to the subcommand. Empty means on the interface object itself.

  • m (Symbol)

    command or action name. Actions are always formatted as action_name!

  • args (Object)

    the command or action arguments

Returns:

  • (Object)

    the command result, or – in the case of an action – the job ID for the newly created action



359
360
361
362
363
364
365
366
367
368
369
370
371
# File 'lib/roby/interface/v2/client.rb', line 359

def call(path, m, *args, **keywords)
    if (action_match = /(.*)!$/.match(m.to_s))
        unless args.empty?
            raise ArgumentError, "jobs only accept keyword arguments"
        end

        start_job(action_match[1], **keywords)
    else
        io.write_packet([path, m, args, keywords])
        result, = poll(1, timeout: @call_timeout)
        result
    end
end

#closeObject

Close the communication channel



95
96
97
# File 'lib/roby/interface/v2/client.rb', line 95

def close
    io.close
end

#closed?Boolean

Whether the communication channel to the server is closed

Returns:

  • (Boolean)


90
91
92
# File 'lib/roby/interface/v2/client.rb', line 90

def closed?
    io.closed?
end

#create_batchBatchContext

Create a batch context

Messages sent to the returned object are validated as much as possible and gathered in a list. Call #process_batch to send all the gathered calls at once to the remote server

Returns:



593
594
595
# File 'lib/roby/interface/v2/client.rb', line 593

def create_batch
    BatchContext.new(self)
end

#each_jobObject

Enumerate the current jobs



569
570
571
572
573
574
575
# File 'lib/roby/interface/v2/client.rb', line 569

def each_job
    return enum_for(__method__) unless block_given?

    jobs.each do |job_id, (job_state, placeholder_task, job_task)|
        yield(Job.new(job_id, job_state, placeholder_task, job_task))
    end
end

#find_action_by_name(name) ⇒ Actions::Models::Action?

Find an action by its name

This is a local operation using the information gathered at connection time

Parameters:

  • name (String)

    the name of the action to look for

Returns:



116
117
118
# File 'lib/roby/interface/v2/client.rb', line 116

def find_action_by_name(name)
    actions.find { |act| act.name == name }
end

#find_all_actions_matching(matcher) ⇒ Array<Actions::Models::Action>

Finds all actions whose name matches a pattern

Parameters:

  • matcher (#===)

    the matching object (usually a Regexp or String)

Returns:



127
128
129
# File 'lib/roby/interface/v2/client.rb', line 127

def find_all_actions_matching(matcher)
    actions.find_all { |act| matcher === act.name }
end

#find_all_jobs_by_action_name(action_name) ⇒ Array<Job>

Find all the jobs that match the given action name

Returns:



580
581
582
583
584
# File 'lib/roby/interface/v2/client.rb', line 580

def find_all_jobs_by_action_name(action_name)
    each_job.find_all do |j|
        j.action_model.name == action_name
    end
end

#find_subcommand_by_name(name) ⇒ Object



612
613
614
# File 'lib/roby/interface/v2/client.rb', line 612

def find_subcommand_by_name(name)
    commands[name]
end

#has_action?(name) ⇒ Boolean

Tests whether the interface has an action with that name

Returns:

  • (Boolean)


105
106
107
# File 'lib/roby/interface/v2/client.rb', line 105

def has_action?(name)
    find_action_by_name(name)
end

#has_exceptions?Boolean

Whether some exception notifications have been queued

Returns:

  • (Boolean)


316
317
318
# File 'lib/roby/interface/v2/client.rb', line 316

def has_exceptions?
    !exception_queue.empty?
end

#has_job_progress?Boolean

Whether some job progress information is currently queued

Returns:

  • (Boolean)


248
249
250
# File 'lib/roby/interface/v2/client.rb', line 248

def has_job_progress?
    !job_progress_queue.empty?
end

#has_notifications?Boolean

Whether some generic notifications have been queued

Returns:

  • (Boolean)


271
272
273
# File 'lib/roby/interface/v2/client.rb', line 271

def has_notifications?
    !notification_queue.empty?
end

#has_subcommand?(name) ⇒ Boolean

Tests whether the remote interface has a given subcommand

Returns:

  • (Boolean)


617
618
619
# File 'lib/roby/interface/v2/client.rb', line 617

def has_subcommand?(name)
    commands.key?(name)
end

#has_ui_event?Boolean

Whether some UI events have been queued

Returns:

  • (Boolean)


292
293
294
# File 'lib/roby/interface/v2/client.rb', line 292

def has_ui_event?
    !ui_event_queue.empty?
end

#poll(expected_count = 0, timeout: nil) ⇒ Object

Polls for new data on the IO channel

Returns:

Raises:

  • (ComError)

    if the link seem to be broken

  • (ProtocolError)

    if some errors happened when validating the protocol



196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
# File 'lib/roby/interface/v2/client.rb', line 196

def poll(expected_count = 0, timeout: nil)
    result = nil
    timeout = if expected_count > 0 then timeout
              else
                  0
              end

    has_cycle_end = false
    while (packet = io.read_packet(timeout))
        has_cycle_end = process_packet(*packet) do |reply_value|
            if result
                raise ProtocolError,
                      "got more than one sync reply in a single poll call"
            end
            result = reply_value
            expected_count -= 1
        end

        if expected_count <= 0
            break if has_cycle_end

            timeout = 0
        end
    end
    if expected_count != 0
        within_s = " within #{timeout}s" if timeout
        raise TimeoutError, "failed to receive expected reply#{within_s}"
    end

    [result, has_cycle_end]
end

#pop_exception(Integer,Array)

Remove and return the oldest exception notification

Returns:

  • ((Integer,Array))

    a unique and monotonically-increasing message ID and the generic notification information as specified by (Interface#on_exception)



325
326
327
# File 'lib/roby/interface/v2/client.rb', line 325

def pop_exception
    exception_queue.shift
end

#pop_job_progress(Integer,Array)

Remove and return the oldest job information message

Returns:

  • ((Integer,Array))

    a unique and monotonically-increasing message ID and the arguments to job progress as specified on Interface#on_job_notification.



257
258
259
# File 'lib/roby/interface/v2/client.rb', line 257

def pop_job_progress
    job_progress_queue.shift
end

#pop_notification(Integer,Array)

Remove and return the oldest generic notification message

Returns:

  • ((Integer,Array))

    a unique and monotonically-increasing message ID and the generic notification information as specified by (Application#notify)



280
281
282
# File 'lib/roby/interface/v2/client.rb', line 280

def pop_notification
    notification_queue.shift
end

#pop_ui_eventObject

Remove the oldest UI event and return it



297
298
299
# File 'lib/roby/interface/v2/client.rb', line 297

def pop_ui_event
    ui_event_queue.shift
end

#process_batch(batch) ⇒ Array

Send all commands gathered in a batch for processing on the remote server

Parameters:

Returns:

  • (Array)

    the return values of each of the calls gathered in the batch



603
604
605
606
# File 'lib/roby/interface/v2/client.rb', line 603

def process_batch(batch)
    ret = call([], :process_batch, batch.__calls)
    BatchContext::Return.from_calls_and_return(batch.__calls, ret)
end

#process_packet(m, args = nil) ⇒ Boolean

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Process a message as received on #io

Parameters:

  • args (defaults to: nil)

    the arguments to the given message. The object type is message-dependent

Returns:

  • (Boolean)

    whether the message was a cycle_end message



139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
# File 'lib/roby/interface/v2/client.rb', line 139

def process_packet(m, args = nil)
    case m
    when :cycle_end
        @cycle_index, @cycle_start_time = *args
        return true
    when :bad_call
        if !pending_async_calls.empty?
            process_pending_async_call(args, nil)
        else
            e = args
            raise RemoteError, e.message, (e.backtrace + caller)
        end
    when :reply
        if !pending_async_calls.empty?
            process_pending_async_call(nil, args)
        else
            yield args
        end
    when :job_progress
        queue_job_progress(*args)
    when :notification
        queue_notification(*args)
    when :ui_event
        queue_ui_event(*args)
    when :exception
        queue_exception(*args)
    else
        raise ProtocolError,
              "unexpected reply from #{io}: #{m} args=#{args}"
    end
    false
end

#process_pending_async_call(error, result) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Remove and call the block of a pending async call



185
186
187
188
# File 'lib/roby/interface/v2/client.rb', line 185

def process_pending_async_call(error, result)
    current_call = pending_async_calls.shift
    current_call[:block].call(error, result)
end

#queue_exception(kind, error, tasks, job_ids) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Push an exception notification to #exception_queue

It can be retrieved with #pop_exception

See the yield parameters of Interface#on_exception for the overall argument format.



309
310
311
312
313
# File 'lib/roby/interface/v2/client.rb', line 309

def queue_exception(kind, error, tasks, job_ids)
    exception_queue.push(
        [allocate_message_id, [kind, error, tasks, job_ids]]
    )
end

#queue_job_progress(kind, job_id, job_name, *args) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Push a job notification to #job_progress_queue

See the yield parameters of Interface#on_job_notification for the overall argument format.



241
242
243
244
245
# File 'lib/roby/interface/v2/client.rb', line 241

def queue_job_progress(kind, job_id, job_name, *args)
    job_progress_queue.push(
        [allocate_message_id, [kind, job_id, job_name, *args]]
    )
end

#queue_notification(source, level, message) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Push a generic notification to #notification_queue



264
265
266
267
268
# File 'lib/roby/interface/v2/client.rb', line 264

def queue_notification(source, level, message)
    notification_queue.push(
        [allocate_message_id, [source, level, message]]
    )
end

#queue_ui_event(event_name, *args) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Push a UI event to #ui_event_queue



287
288
289
# File 'lib/roby/interface/v2/client.rb', line 287

def queue_ui_event(event_name, *args)
    ui_event_queue.push [allocate_message_id, [event_name, *args]]
end

#reload_actionsObject



608
609
610
# File 'lib/roby/interface/v2/client.rb', line 608

def reload_actions
    @actions = call([], :reload_actions)
end

#start_job(action_name, **arguments) ⇒ Object

Start the given job within the batch

Parameters:

  • action_name (Symbol)

    the action name

  • arguments (Hash<Symbol,Object>)

    the action arguments

Raises:



338
339
340
341
342
343
344
345
# File 'lib/roby/interface/v2/client.rb', line 338

def start_job(action_name, **arguments)
    unless find_action_by_name(action_name)
        raise NoSuchAction,
              "there is no action called #{action_name} on #{self}"
    end

    call([], :start_job, action_name, **arguments)
end

#statsStats

Returns I/O statistics.

Returns:

  • (Stats)

    I/O statistics



85
86
87
# File 'lib/roby/interface/v2/client.rb', line 85

def stats
    io.stats
end

#subcommand(name) ⇒ Object

Returns a shell object



622
623
624
625
626
627
628
629
# File 'lib/roby/interface/v2/client.rb', line 622

def subcommand(name)
    unless (sub = find_subcommand_by_name(name))
        raise ArgumentError,
              "#{name} is not a known subcommand on #{self}"
    end

    SubcommandClient.new(self, name, sub.description, sub.commands)
end

#to_ioObject

The underlying IO object



100
101
102
# File 'lib/roby/interface/v2/client.rb', line 100

def to_io
    io.to_io
end

#wait(timeout: nil) ⇒ Boolean

Wait until there is data to process on the IO channel

Parameters:

  • timeout (Numeric, nil) (defaults to: nil)

    a timeout after which the method will return. Use nil for no timeout

Returns:

  • (Boolean)

    falsy if the timeout was reached, true otherwise



178
179
180
# File 'lib/roby/interface/v2/client.rb', line 178

def wait(timeout: nil)
    io.read_wait(timeout: timeout)
end