Class: Roby::Interface::V1::Client

Inherits:
Object
  • Object
show all
Defined in:
lib/roby/interface/v1/client.rb

Overview

The client-side object that allows to access an interface (e.g. a Roby app) from another process than the Roby controller

Defined Under Namespace

Classes: BatchContext, Job, NoSuchAction, TimeoutError

Constant Summary collapse

DEFAULT_CALL_TIMEOUT =

Default value for #call_timeout

10

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(io, id, handshake: %i[actions commands])) ⇒ Client

Create a client endpoint to a Roby interface [Server]



69
70
71
72
73
74
75
76
77
78
79
80
81
82
# File 'lib/roby/interface/v1/client.rb', line 69

def initialize(io, id, handshake: i[actions commands])
    @pending_async_calls = []
    @io = io
    @message_id = 0
    @notification_queue = []
    @job_progress_queue = []
    @exception_queue = []
    @ui_event_queue = []
    @call_timeout = DEFAULT_CALL_TIMEOUT

    @handshake_results = call([], :handshake, id, handshake)
    @actions = @handshake_results[:actions]
    @commands = @handshake_results[:commands]
end

Dynamic Method Handling

This class handles dynamic methods through the method_missing method

#method_missing(m, *args, &b) ⇒ Object

rubocop:disable Style/MethodMissingSuper



621
622
623
624
625
626
627
628
629
# File 'lib/roby/interface/v1/client.rb', line 621

def method_missing(m, *args, &b) # rubocop:disable Style/MethodMissingSuper
    if (sub = find_subcommand_by_name(m.to_s))
        SubcommandClient.new(self, m.to_s, sub.description, sub.commands)
    elsif (match = /^async_(.*)$/.match(m.to_s))
        async_call([], match[1].to_sym, *args, &b)
    else
        call([], m, *args)
    end
end

Instance Attribute Details

#actionsArray<Roby::Actions::Model::Action> (readonly)



17
18
19
# File 'lib/roby/interface/v1/client.rb', line 17

def actions
  @actions
end

#call_timeoutFloat

Timeout, in seconds, in blocking remote calls

Defaults to DEFAULT_CALL_TIMEOUT



55
56
57
# File 'lib/roby/interface/v1/client.rb', line 55

def call_timeout
  @call_timeout
end

#commandsHash (readonly)



19
20
21
# File 'lib/roby/interface/v1/client.rb', line 19

def commands
  @commands
end

#cycle_indexInteger (readonly)



39
40
41
# File 'lib/roby/interface/v1/client.rb', line 39

def cycle_index
  @cycle_index
end

#cycle_start_timeTime (readonly)



41
42
43
# File 'lib/roby/interface/v1/client.rb', line 41

def cycle_start_time
  @cycle_start_time
end

#exception_queueArray<Integer,Array> (readonly)



32
33
34
# File 'lib/roby/interface/v1/client.rb', line 32

def exception_queue
  @exception_queue
end

#handshake_resultsHash<Symbol,Object> (readonly)

Result of the calls done during the handshake



48
49
50
# File 'lib/roby/interface/v1/client.rb', line 48

def handshake_results
  @handshake_results
end

#ioDRobyChannel (readonly)



15
16
17
# File 'lib/roby/interface/v1/client.rb', line 15

def io
  @io
end

#job_progress_queueArray<Integer,Array> (readonly)



24
25
26
# File 'lib/roby/interface/v1/client.rb', line 24

def job_progress_queue
  @job_progress_queue
end

#notification_queueArray<Integer,Array> (readonly)



28
29
30
# File 'lib/roby/interface/v1/client.rb', line 28

def notification_queue
  @notification_queue
end

#pending_async_callsArray<Hash> (readonly)



43
44
45
# File 'lib/roby/interface/v1/client.rb', line 43

def pending_async_calls
  @pending_async_calls
end

#ui_event_queueArray<Integer,Array> (readonly)



36
37
38
# File 'lib/roby/interface/v1/client.rb', line 36

def ui_event_queue
  @ui_event_queue
end

Instance Method Details

#allocate_message_idObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Allocation of unique IDs for notification messages



222
223
224
# File 'lib/roby/interface/v1/client.rb', line 222

def allocate_message_id
    @message_id += 1
end

#async_call(path, m, *args, &block) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Asynchronously call a method on the interface or on one of the interface’s subcommands



379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
# File 'lib/roby/interface/v1/client.rb', line 379

def async_call(path, m, *args, &block)
    raise "no callback block given" unless block_given?

    if (action_match = /(.*)!$/.match(m.to_s))
        action_name = action_match[1]
        unless find_action_by_name(action_name)
            raise NoSuchAction,
                  "there is no action called #{action_name} on #{self}"
        end

        path = []
        m = :start_job
        args = [action_name, *args]
    end
    io.write_packet([path, m, *args])
    pending_async_calls << { block: block, path: path, m: m, args: args }
    pending_async_calls.last.freeze
end

#async_call_pending?(a_call) ⇒ Boolean

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Whether the async call is still pending



404
405
406
# File 'lib/roby/interface/v1/client.rb', line 404

def async_call_pending?(a_call)
    pending_async_calls.any? { |item| item.equal?(a_call) }
end

#call(path, m, *args) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Call a method on the interface or on one of the interface’s subcommands



350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
# File 'lib/roby/interface/v1/client.rb', line 350

def call(path, m, *args)
    if (action_match = /(.*)!$/.match(m.to_s))
        if args.empty?
            args = [{}]
        elsif args.size > 1 || !args.last.kind_of?(Hash)
            raise ArgumentError,
                  "expected a single Hash argument, but got #{args}"
        end

        start_job(action_match[1], **args.first)
    else
        io.write_packet([path, m, *args])
        result, = poll(1, timeout: @call_timeout)
        result
    end
end

#closeObject

Close the communication channel



90
91
92
# File 'lib/roby/interface/v1/client.rb', line 90

def close
    io.close
end

#closed?Boolean

Whether the communication channel to the server is closed



85
86
87
# File 'lib/roby/interface/v1/client.rb', line 85

def closed?
    io.closed?
end

#create_batchBatchContext

Create a batch context

Messages sent to the returned object are validated as much as possible and gathered in a list. Call #process_batch to send all the gathered calls at once to the remote server



583
584
585
# File 'lib/roby/interface/v1/client.rb', line 583

def create_batch
    BatchContext.new(self)
end

#each_jobObject

Enumerate the current jobs



559
560
561
562
563
564
565
# File 'lib/roby/interface/v1/client.rb', line 559

def each_job
    return enum_for(__method__) unless block_given?

    jobs.each do |job_id, (job_state, placeholder_task, job_task)|
        yield(Job.new(job_id, job_state, placeholder_task, job_task))
    end
end

#find_action_by_name(name) ⇒ Actions::Models::Action?

Find an action by its name

This is a local operation using the information gathered at connection time



111
112
113
# File 'lib/roby/interface/v1/client.rb', line 111

def find_action_by_name(name)
    actions.find { |act| act.name == name }
end

#find_all_actions_matching(matcher) ⇒ Array<Actions::Models::Action>

Finds all actions whose name matches a pattern



120
121
122
# File 'lib/roby/interface/v1/client.rb', line 120

def find_all_actions_matching(matcher)
    actions.find_all { |act| matcher === act.name }
end

#find_all_jobs_by_action_name(action_name) ⇒ Array<Job>

Find all the jobs that match the given action name



570
571
572
573
574
# File 'lib/roby/interface/v1/client.rb', line 570

def find_all_jobs_by_action_name(action_name)
    each_job.find_all do |j|
        j.action_model.name == action_name
    end
end

#find_subcommand_by_name(name) ⇒ Object



602
603
604
# File 'lib/roby/interface/v1/client.rb', line 602

def find_subcommand_by_name(name)
    commands[name]
end

#has_action?(name) ⇒ Boolean

Tests whether the interface has an action with that name



100
101
102
# File 'lib/roby/interface/v1/client.rb', line 100

def has_action?(name)
    find_action_by_name(name)
end

#has_exceptions?Boolean

Whether some exception notifications have been queued



307
308
309
# File 'lib/roby/interface/v1/client.rb', line 307

def has_exceptions?
    !exception_queue.empty?
end

#has_job_progress?Boolean

Whether some job progress information is currently queued



239
240
241
# File 'lib/roby/interface/v1/client.rb', line 239

def has_job_progress?
    !job_progress_queue.empty?
end

#has_notifications?Boolean

Whether some generic notifications have been queued



262
263
264
# File 'lib/roby/interface/v1/client.rb', line 262

def has_notifications?
    !notification_queue.empty?
end

#has_subcommand?(name) ⇒ Boolean

Tests whether the remote interface has a given subcommand



607
608
609
# File 'lib/roby/interface/v1/client.rb', line 607

def has_subcommand?(name)
    commands.key?(name)
end

#has_ui_event?Boolean

Whether some UI events have been queued



283
284
285
# File 'lib/roby/interface/v1/client.rb', line 283

def has_ui_event?
    !ui_event_queue.empty?
end

#poll(expected_count = 0, timeout: nil) ⇒ Object

Polls for new data on the IO channel

Raises:

  • (ComError)

    if the link seem to be broken

  • (ProtocolError)

    if some errors happened when validating the protocol



187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
# File 'lib/roby/interface/v1/client.rb', line 187

def poll(expected_count = 0, timeout: nil)
    result = nil
    timeout = if expected_count > 0 then timeout
              else
                  0
              end

    has_cycle_end = false
    while (packet = io.read_packet(timeout))
        has_cycle_end = process_packet(*packet) do |reply_value|
            if result
                raise ProtocolError,
                      "got more than one sync reply in a single poll call"
            end
            result = reply_value
            expected_count -= 1
        end

        if expected_count <= 0
            break if has_cycle_end

            timeout = 0
        end
    end
    if expected_count != 0
        within_s = " within #{timeout}s" if timeout
        raise TimeoutError, "failed to receive expected reply#{within_s}"
    end

    [result, has_cycle_end]
end

#pop_exception(Integer,Array)

Remove and return the oldest exception notification



316
317
318
# File 'lib/roby/interface/v1/client.rb', line 316

def pop_exception
    exception_queue.shift
end

#pop_job_progress(Integer,Array)

Remove and return the oldest job information message



248
249
250
# File 'lib/roby/interface/v1/client.rb', line 248

def pop_job_progress
    job_progress_queue.shift
end

#pop_notification(Integer,Array)

Remove and return the oldest generic notification message



271
272
273
# File 'lib/roby/interface/v1/client.rb', line 271

def pop_notification
    notification_queue.shift
end

#pop_ui_eventObject

Remove the oldest UI event and return it



288
289
290
# File 'lib/roby/interface/v1/client.rb', line 288

def pop_ui_event
    ui_event_queue.shift
end

#process_batch(batch) ⇒ Array

Send all commands gathered in a batch for processing on the remote server



593
594
595
596
# File 'lib/roby/interface/v1/client.rb', line 593

def process_batch(batch)
    ret = call([], :process_batch, batch.__calls)
    BatchContext::Return.from_calls_and_return(batch.__calls, ret)
end

#process_packet(m, *args) ⇒ Boolean

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Process a message as received on #io



129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
# File 'lib/roby/interface/v1/client.rb', line 129

def process_packet(m, *args)
    case m
    when :cycle_end
        @cycle_index, @cycle_start_time = *args
        return true
    when :bad_call
        if !pending_async_calls.empty?
            process_pending_async_call(args.first, nil)
        else
            e = args.first
            raise e, e.message, (e.backtrace + caller)
        end
    when :reply
        if !pending_async_calls.empty?
            process_pending_async_call(nil, args.first)
        else
            yield args.first
        end
    when :job_progress
        queue_job_progress(*args)
    when :notification
        queue_notification(*args)
    when :ui_event
        queue_ui_event(*args)
    when :exception
        queue_exception(*args)
    else
        raise ProtocolError,
              "unexpected reply from #{io}: #{m} "\
              "(#{args.map(&:to_s).join(',')})"
    end
    false
end

#process_pending_async_call(error, result) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Remove and call the block of a pending async call



176
177
178
179
# File 'lib/roby/interface/v1/client.rb', line 176

def process_pending_async_call(error, result)
    current_call = pending_async_calls.shift
    current_call[:block].call(error, result)
end

#queue_exception(kind, error, tasks, job_ids) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Push an exception notification to #exception_queue

It can be retrieved with #pop_exception

See the yield parameters of Interface#on_exception for the overall argument format.



300
301
302
303
304
# File 'lib/roby/interface/v1/client.rb', line 300

def queue_exception(kind, error, tasks, job_ids)
    exception_queue.push(
        [allocate_message_id, [kind, error, tasks, job_ids]]
    )
end

#queue_job_progress(kind, job_id, job_name, *args) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Push a job notification to #job_progress_queue

See the yield parameters of Interface#on_job_notification for the overall argument format.



232
233
234
235
236
# File 'lib/roby/interface/v1/client.rb', line 232

def queue_job_progress(kind, job_id, job_name, *args)
    job_progress_queue.push(
        [allocate_message_id, [kind, job_id, job_name, *args]]
    )
end

#queue_notification(source, level, message) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Push a generic notification to #notification_queue



255
256
257
258
259
# File 'lib/roby/interface/v1/client.rb', line 255

def queue_notification(source, level, message)
    notification_queue.push(
        [allocate_message_id, [source, level, message]]
    )
end

#queue_ui_event(event_name, *args) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Push a UI event to #ui_event_queue



278
279
280
# File 'lib/roby/interface/v1/client.rb', line 278

def queue_ui_event(event_name, *args)
    ui_event_queue.push [allocate_message_id, [event_name, *args]]
end

#reload_actionsObject



598
599
600
# File 'lib/roby/interface/v1/client.rb', line 598

def reload_actions
    @actions = call([], :reload_actions)
end

#start_job(action_name, **arguments) ⇒ Object

Start the given job within the batch

Raises:



329
330
331
332
333
334
335
336
# File 'lib/roby/interface/v1/client.rb', line 329

def start_job(action_name, **arguments)
    unless find_action_by_name(action_name)
        raise NoSuchAction,
              "there is no action called #{action_name} on #{self}"
    end

    call([], :start_job, action_name, arguments)
end

#subcommand(name) ⇒ Object

Returns a shell object



612
613
614
615
616
617
618
619
# File 'lib/roby/interface/v1/client.rb', line 612

def subcommand(name)
    unless (sub = find_subcommand_by_name(name))
        raise ArgumentError,
              "#{name} is not a known subcommand on #{self}"
    end

    SubcommandClient.new(self, name, sub.description, sub.commands)
end

#to_ioObject

The underlying IO object



95
96
97
# File 'lib/roby/interface/v1/client.rb', line 95

def to_io
    io.to_io
end

#wait(timeout: nil) ⇒ Boolean

Wait until there is data to process on the IO channel



169
170
171
# File 'lib/roby/interface/v1/client.rb', line 169

def wait(timeout: nil)
    io.read_wait(timeout: timeout)
end