Class: Roby::Tasks::ExternalProcess

Inherits:
Roby::Task show all
Defined in:
lib/roby/tasks/external_process.rb

Overview

This task class can be used to monitor the execution of an external process. Among the useful features, it can redirect standard output and error output to files.

The events will act as follows:

  • the start command starts the process per se. The event is emitted once the process has been spawned with success

  • the signaled event is emitted when the process dies because of a signal. The event’s context is the Process::Status object.

  • the failed event is emitted whenever the process exits with a nonzero status. The event’s context is the Process::Status object.

  • the success event is emitted when the process exits with a zero status

  • the stop event is emitted when the process exits, regardless of how

The task by default is not interruptible, because there is no good common way to gracefully terminate an external program. To e.g. use signals, one would need to explicitely make the :stop command send a signal to #pid and let ExternalProcess’ signal handling do the rest.

Constant Summary

Constants included from Models::Arguments

Models::Arguments::NO_DEFAULT_ARGUMENT

Instance Attribute Summary collapse

Attributes inherited from Roby::Task

#arguments, #bound_events, #data, #execute_handlers, #failed_to_start_time, #failure_event, #failure_reason, #history, #poll_handlers, #state_machine, #terminal_event

Attributes included from GUI::RelationsCanvasTask

#displayed_state, #last_event

Attributes inherited from PlanObject

#addition_time, #executable, #execution_engine, #finalization_handlers, #finalization_time, #model, #plan, #promise_executor, #removed_at

Attributes included from Roby::Transaction::Proxying::Cache

#transaction_forwarder_module, #transaction_proxy_module

Attributes included from Relations::DirectedRelationSupport

#relation_graphs

Attributes inherited from DistributedObject

#local_owner_id, #owners

Instance Method Summary collapse

Methods inherited from Roby::Task

#+, #abstract?, #add_child_object, #add_coordination_object, #apply_replacement_operations, #apply_terminal_flags, #as_plan, #as_service, #assign_argument, #assign_arguments, #can_merge?, #can_replace?, #check_emission_validity, #clear_events_external_relations, #clear_relations, #commit_transaction, #compatible_state?, #compute_object_replacement_operation, #compute_subplan_replacement_operation, #create_fresh_copy, create_script, #create_transaction_proxy, #current_state, #current_state?, #do_not_reuse, #do_poll, #each_coordination_object, #each_event, #each_exception_handler, #emit, #end_time, #ensure_poll_handler_called, #event, #event_model, #executable=, #executable?, #execute, #failed_to_start!, #failed_to_start?, #find_event, #fired_event, #forcefully_terminate, #forward_to, #freeze_delayed_arguments, #fullfills?, #fully_instanciated?, #garbage!, goal, #goal, #handle_exception, #has_argument?, #has_event?, #initialize_copy, #initialize_replacement, #inspect, #interruptible?, #invalidate_terminal_flag, #invalidated_terminal_flag?, #last_event, #lifetime, #list_unset_arguments, #mark_failed_to_start, #match, #meaningful_arguments, #name, #null?, #on, #partially_instanciated?, #plan=, #poll, #pretty_print, #promise, #quarantined!, #quarantined?, #related_events, #related_tasks, #remove_coordination_object, #remove_poll_handler, #replace_by, #replace_subplan_by, #resolve_goals, #resolve_state_sources, #respawn, #reusable?, #running?, #script, script, #signals, #simulate, #start_time, #state, state, #terminal_events, #to_execution_exception, #to_s, #to_task, #transition!, #update_task_status, #update_terminal_flag, #updated_data, #use_fault_response_table, #when_finalized, #|

Methods included from Models::Task

#abstract, #all_models, #as_plan, #can_merge?, #causal_link, #clear_model, #compute_terminal_events, #define_command_method, #define_event_methods, #discover_terminal_events, #enum_events, #event, #event_model, #find_event_model, #forward, #from, #from_state, #fullfills?, #instantiate_event_relations, #interruptible, #invalidate_template, #match, model_attribute_list, model_relation, #on, #on_exception, #poll, #precondition, #provided_services, #query, #signal, #template, #terminal_events, #terminates, #to_coordination_task, #to_execution_exception_matcher, #update_terminal_flag, #with_arguments

Methods included from Models::Arguments

#argument, #arguments, #default_argument, #fullfills?, #meaningful_arguments

Methods included from Roby::TaskStateHelper

#import_events_to_roby, #namespace, #namespace=, #refine_running_state, #state_machine

Methods included from DRoby::Identifiable

#droby_id, #initialize_copy

Methods included from DRoby::V5::Models::TaskDumper

#droby_dump

Methods included from DRoby::V5::ModelDumper

#droby_dump

Methods included from DRoby::V5::TaskDumper

#droby_dump

Methods included from GUI::GraphvizTask

#apply_layout, #dot_label, #to_dot_events

Methods included from GUI::GraphvizPlanObject

#apply_layout, #dot_label, #to_dot

Methods included from GUI::RelationsCanvasTask

#display, #display_create, #display_name, #display_time_end, #display_time_start, #layout_events, to_svg, #update_graphics

Methods included from GUI::RelationsCanvasPlanObject

#display, #display_create, #display_events, #display_name, #display_parent

Methods included from ExceptionHandlingObject

#add_error, #handle_exception, #pass_exception

Methods inherited from PlanObject

#add_child_object, #apply_relation_changes, #as_plan, #can_finalize?, #commit_transaction, #concrete_model, #connection_space, #each_finalization_handler, #each_in_neighbour_merged, #each_out_neighbour_merged, #each_plan_child, #engine, #executable?, #finalized!, #finalized?, #forget_peer, #fullfills?, #garbage!, #garbage?, #initialize_copy, #initialize_replacement, #merged_relations, #promise, #read_write?, #real_object, #remotely_useful?, #replace_by, #replace_subplan_by, #root_object, #root_object?, #subscribed?, #transaction_proxy?, #transaction_stack, #update_on?, #updated_by?, #when_finalized

Methods included from Models::PlanObject

#child_plan_object, #finalization_handler, #match, #when_finalized

Methods included from Relations::DirectedRelationSupport

#[], #[]=, #add_child_object, #add_parent_object, #child_object?, #child_objects, #clear_vertex, #each_child_object, #each_in_neighbour, #each_out_neighbour, #each_parent_object, #each_relation, #each_relation_graph, #each_relation_sorted, #each_root_relation_graph, #enum_child_objects, #enum_parent_objects, #enum_relations, #leaf?, #parent_object?, #parent_objects, #related_object?, #related_objects, #relation_graph_for, #relations, #remove_child_object, #remove_children, #remove_parent_object, #remove_parents, #remove_relations, #root?, #sorted_relations

Methods inherited from DistributedObject

#add_owner, #clear_owners, #initialize_copy, #owned_by?, #remove_owner

Constructor Details

#initialize(arguments = Hash.new) ⇒ ExternalProcess

Returns a new instance of ExternalProcess.



49
50
51
52
53
54
55
56
57
# File 'lib/roby/tasks/external_process.rb', line 49

def initialize(arguments = Hash.new)
    if arg = arguments[:command_line]
        arguments[:command_line] = [arg] if !arg.kind_of?(Array)
    end
    @pid = nil
    @buffer = nil
    @redirection = Hash.new
    super(arguments)
end

Instance Attribute Details

#pidObject (readonly)

The PID of the child process, or nil if the child process is not running



47
48
49
# File 'lib/roby/tasks/external_process.rb', line 47

def pid
  @pid
end

Instance Method Details

#command_lineObject

:attr_reader: This task argument is an array whose first element is the executable to start and the rest the arguments that need to be passed to it.

It can also be set to a simple string, which is interpreted as the executable name with no arguments.



31
# File 'lib/roby/tasks/external_process.rb', line 31

argument :command_line

#create_redirection(redir_target) ⇒ Object

Handle redirection for a single stream (out or err)



112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
# File 'lib/roby/tasks/external_process.rb', line 112

def create_redirection(redir_target)
    if !redir_target
        return [], nil
    elsif redir_target == :close
        return [], :close
    elsif redir_target == :pipe
        pipe, io = IO.pipe
        return [[:close, io]], io, pipe, String.new
    elsif redir_target !~ /%p/
        # Assume no replacement in redirection, just open the file
        if redir_target[0, 1] == '+'
            io = File.open(redir_target[1..-1], 'a')
        else
            io = File.open(redir_target, 'w')
        end
        return [[:close, io]], io
    else
        io = open_redirection(working_directory) 
        return [[redir_target, io]], io
    end
end

#dead!(result) ⇒ Object

Called to announce that this task has been killed. result is the corresponding Process::Status object.



61
62
63
64
65
66
67
68
69
70
71
# File 'lib/roby/tasks/external_process.rb', line 61

def dead!(result)
    if !result
        failed_event.emit
    elsif result.success?
        success_event.emit
    elsif result.signaled?
        signaled_event.emit(result)
    else
        failed_event.emit(result)
    end
end

#handle_redirectionObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Setup redirections pre-spawn



137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
# File 'lib/roby/tasks/external_process.rb', line 137

def handle_redirection
    if !@redirection[:stdout] && !@redirection[:stderr]
        return [], Hash.new
    elsif (@redirection[:stdout] == @redirection[:stderr]) && ![:pipe, :close].include?(@redirection[:stdout])
        io = open_redirection(working_directory) 
        return [[@redirection[:stdout], io]], Hash[out: io, err: io]
    end

    out_open, out_io, @out_pipe, @out_buffer =
        create_redirection(@redirection[:stdout])
    err_open, err_io, @err_pipe, @err_buffer =
        create_redirection(@redirection[:stderr])

    if @out_buffer || @err_buffer
        @read_buffer = String.new
    end

    spawn_options = Hash.new
    spawn_options[:out] = out_io if out_io
    spawn_options[:err] = err_io if err_io
    return (out_open + err_open), spawn_options
end

#kill(signo) ⇒ Object

Kills the child process

Parameters:

  • signo (String, Integer)

    the signal name or number, as accepted by Process#kill



204
205
206
# File 'lib/roby/tasks/external_process.rb', line 204

def kill(signo)
    Process.kill(signo, pid)
end

#open_redirection(dir) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Open the output file for redirection, before spawning



194
195
196
197
198
# File 'lib/roby/tasks/external_process.rb', line 194

def open_redirection(dir)
    Dir::Tmpname.create 'roby-external-process', dir do |path, _|
        return File.open(path, 'w+')
    end
end

#read_pipe(pipe, buffer) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Read a given pipe, when an output is redirected to pipe



211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
# File 'lib/roby/tasks/external_process.rb', line 211

def read_pipe(pipe, buffer)
    received = false
    while true
        pipe.read_nonblock 1024, @read_buffer
        received = true
        buffer.concat(@read_buffer)
    end
rescue EOFError
    if received
        return true, buffer.dup
    end
rescue IO::WaitReadable
    if received
        return false, buffer.dup
    end
end

#read_pipesObject



240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
# File 'lib/roby/tasks/external_process.rb', line 240

def read_pipes
    if @out_pipe
        eos, data = read_pipe(@out_pipe, @out_buffer)
        if eos
            @out_pipe = nil
        end
        if data
            stdout_received(data)
        end
    end
    if @err_pipe
        eos, data = read_pipe(@err_pipe, @err_buffer)
        if eos
            @err_pipe = nil
        end
        if data
            stderr_received(data)
        end
    end
end

#redirect_output(common = nil, stdout: nil, stderr: nil) ⇒ Object

If set to a string, the process’ standard output will be redirected to the given file. The following replacement is done:

  • ‘%p’ is replaced by the process PID

The last form (with nil argument) removes any redirection. A specific redirection can also be disabled using the hash form:

redirect_output stdout: nil

:call-seq:

redirect_output "file"
redirect_output stdout: "file-out", stderr: "another-file"
redirect_output nil


87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
# File 'lib/roby/tasks/external_process.rb', line 87

def redirect_output(common = nil, stdout: nil, stderr: nil)
    if @pid
        raise RuntimeError, "cannot change redirection after task start"
    elsif common
        stdout = stderr = common
    end

    @redirection = Hash.new
    if stdout
        @redirection[:stdout] = 
            if [:pipe, :close].include?(stdout) then stdout
            else stdout.to_str
            end
    end
    if stderr
        @redirection[:stderr] = 
            if [:pipe, :close].include?(stderr) then stderr
            else stderr.to_str
            end
    end
end

#redirection_path(pattern, pid) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns the file name based on the redirection pattern and the current PID value.



187
188
189
# File 'lib/roby/tasks/external_process.rb', line 187

def redirection_path(pattern, pid) # :nodoc:
    pattern.gsub '%p', pid.to_s
end

#startObject

:method: start!

Starts the child process. Emits start when the process is actually started.



165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
# File 'lib/roby/tasks/external_process.rb', line 165

event :start do |context|
    working_directory = (self.working_directory || Dir.pwd)
    options = Hash[pgroup: 0, chdir: working_directory]

    opened_ios, spawn_options = handle_redirection

    @pid = Process.spawn *command_line, **spawn_options
    opened_ios.each do |pattern, io|
        if pattern == :close
            io.close
        else
            FileUtils.mv io.path, File.join(working_directory, redirection_path(pattern, @pid))
        end
    end

    start_event.emit
end

#stderr_received(data) ⇒ Object

Method called when data is received on an intercepted stderr

Intercept stdout by calling redirect_output(stderr: :pipe)



237
238
# File 'lib/roby/tasks/external_process.rb', line 237

def stderr_received(data)
end

#stdout_received(data) ⇒ Object

Method called when data is received on an intercepted stdout

Intercept stdout by calling redirect_output(stdout: :pipe)



231
232
# File 'lib/roby/tasks/external_process.rb', line 231

def stdout_received(data)
end

#working_directoryObject

:attr_reader: The working directory. If not set, the current directory is used.



36
# File 'lib/roby/tasks/external_process.rb', line 36

argument :working_directory, default: nil