Module: PPipe::Controller
Constant Summary collapse
- CON =
_GHAkxsadf0a0s98
:controller_switch
Instance Attribute Summary collapse
-
#controller ⇒ Object
readonly
the pipe number of the controller (nil if there is no controller).
Attributes included from Methods
#is_root, #mpn, #redirect, #thread_safe, #tp_required, #tt_required
Attributes included from Log
Instance Method Summary collapse
- #auto_load_save(resource_name, file_name) ⇒ Object
-
#controller_alive? ⇒ Boolean
Is there a controller?.
-
#controller_refresh=(value) ⇒ Object
Change the controller refresh time.
-
#controller_woken?(wait_till_woken = false) ⇒ Boolean
Returns true if the controller is awake, false if it is asleep.
-
#get_shared_resource(resource_name) ⇒ Object
Get a shared resource from the controller.
-
#lock(mutex_name) ⇒ Object
Lock a mutex named mutex_name.
-
#mutex_locked?(mutex_name) ⇒ Boolean
Is the mutex called mutex_name locked? Returns [pipe_no, thread_id] of the locking thread and process if locked, nil if it is not locked.
-
#put_controller_to_sleep(sleep_time, wait_for_waking) ⇒ Object
If the controller will not be need for a while, put it to sleep for sleep_time seconds.
-
#resource_locked?(resource_name) ⇒ Boolean
Is the shared resource called resource_name locked? Returns [pipe_no, thread_id] of the locking thread and process if locked, nil if it is not locked.
-
#return_shared_resource(resource_name, resource) ⇒ Object
Return a shared resource.
-
#shared_resource(resource_name, &block) ⇒ Object
Fetch a shared resource, edit it and then return it.
-
#start_controller(controller_refresh = 0.001) ⇒ Object
Start a controller process.
-
#stop_controller ⇒ Object
Stop the controller.
-
#synchronize(mutex_name, &block) ⇒ Object
Lock the mutex, call the block and then unlock the mutex.
-
#unlock(mutex_name) ⇒ Object
Unlock a mutex.
-
#wake_controller ⇒ Object
Wake up the controller.
Methods included from Methods
#assigned?, #die, #exit, #finish, #fork, #gets, #i_recv, #i_send, #kill_all, #kill_pipe, #pids, #puts, #read_all, #set_up_pipes, #t_recv, #tid, #user_end, #w_recv, #w_send, #wait, #wait_till_assigned, #waitall
Methods included from Log
clean_up, io=, #log, log_file, log_file=
Methods included from Reader
#configure_reader, #get_line, #read_pipe
Instance Attribute Details
#controller ⇒ Object (readonly)
the pipe number of the controller (nil if there is no controller)
1853 1854 1855 |
# File 'lib/parallelpipes.rb', line 1853 def controller @controller end |
Instance Method Details
#auto_load_save(resource_name, file_name) ⇒ Object
2110 2111 2112 2113 2114 2115 2116 2117 2118 2119 2120 |
# File 'lib/parallelpipes.rb', line 2110 def auto_load_save(resource_name, file_name) ArgumentError.check([:resource_name, resource_name, [Symbol, Integer, Fixnum]]) log 'fpv', :auto_load_save raise NoController unless @controller raise DeadParallelPipe unless @alive i_send(:auto_load_save, [resource_name, file_name], tp: @controller) reply = w_recv(:auto_load_save_ + resource_name, fp: @controller) raise ControllerError.new("if auto_load_save is called it must be before any access to a shared resource (#{resource_name})") if reply == :already_accessed raise ControllerError.new("another thread or process has already called auto_load_save for #{resource_name}") if reply == :already_opened log 'fpvc', :auto_load_save end |
#controller_alive? ⇒ Boolean
Is there a controller?
2168 2169 2170 2171 |
# File 'lib/parallelpipes.rb', line 2168 def controller_alive? return @controller ? true : false end |
#controller_refresh=(value) ⇒ Object
Change the controller refresh time. controller_refresh is the delay between the controller dealing with requests. A shorter refresh time means the controller runs faster, but consumes more CPU (typically a controller consumes about 1-6% of CPU). Default is 0.001. There is no point having it smaller than 0.0001 - there is no improvement in performance.
Benchmarks (2GHz AMD 64 (32bit OS))
Time taken to lock and unlock a mutex twice
controller_refresh : time taken
-
0.1: 0.313996553421021
-
0.01: 0.132316589355469
-
0.001: 0.0158429145812988
-
0.0001: 0.00638794898986816
-
1.0e-05: 0.0312449932098389
Footnote:
Consuming 1-6% of CPU seems like a lot. However, PPipe is aiming to bring the multi-core, multi-process world to Ruby. Most new computers have at least two CPU cores nowadays, many have more and the number will rise in the future. 1-6% of 1 processor is not much when the computer has 4, 8 or 16 processors.
Having said which, if someone finds a better way of doing what the controller does, please share it!
And finally, if this overhead is unacceptable, PPipe run without a Controller (see Methods#set_up_pipes) is still a powerful parallelization tool.
1916 1917 1918 1919 1920 |
# File 'lib/parallelpipes.rb', line 1916 def controller_refresh=(value) raise NoController unless @controller raise DeadParallelPipe unless @alive i_send(:set_controller_refresh, value, tp: @controller, evaluate: true) end |
#controller_woken?(wait_till_woken = false) ⇒ Boolean
Returns true if the controller is awake, false if it is asleep. Currently hangs if the controller has not been told to sleep.
Experimental
2146 2147 2148 2149 2150 2151 2152 2153 2154 2155 2156 2157 2158 2159 2160 2161 2162 2163 2164 |
# File 'lib/parallelpipes.rb', line 2146 def controller_woken?(wait_till_woken=false) #blocks unless controller has been put to sleep. Returns true immediately if controller is asleep. Returns false immediately if controller has been put to sleep and woken raise NoController unless @controller raise DeadParallelPipe unless @alive while [tid][:controller_sleep_switch][@controller].values[0].size > 2 2.times{[tid][:controller_sleep_switch][@controller].values[0].shift} end unless @controller_asleep w_recv(:controller_sleep_switch, fp: @controller) @controller_asleep = true else if wait_till_woken @controller_asleep = w_recv(:controller_sleep_switch, fp: @controller) else @controller_asleep = !t_recv(:controller_sleep_switch, fp: @controller) end end end |
#get_shared_resource(resource_name) ⇒ Object
Get a shared resource from the controller. No other process or thread can access the resource until this process has returned it.
resource_name must be a Symbol or Integer
A shared resource can be any object where
eval(object.inspect) == object
If the resource has not been accessed before it is initialized to nil.
ppipe = PPipe.new(5, true) savings = ppipe.get_shared_resource(:savings)
2025 2026 2027 2028 2029 2030 2031 2032 2033 2034 2035 2036 2037 2038 2039 2040 2041 2042 2043 2044 |
# File 'lib/parallelpipes.rb', line 2025 def get_shared_resource(resource_name) ArgumentError.check([:resource_name, resource_name, [Symbol, Integer, Fixnum]]) log 'fpv', :get_shared_resource @shared_resource_mutexes[resource_name] ||= Mutex.new @shared_resource_mutexes[resource_name].lock raise NoController unless @controller raise DeadParallelPipe unless @alive @old_resource_controllers ||= {} @old_resource_controllers[resource_name] = @controller # check_for_exit_signal i_send(CON, [resource_name, :fetch], tp: @controller) # $stderr.puts 'request sent' ans = w_recv(resource_name, fp: @controller) # $stderr.puts 'got answer' log 'fpvc', :get_shared_resource return ans[0] end |
#lock(mutex_name) ⇒ Object
Lock a mutex named mutex_name. If the mutex did not exist before, create it. No other process or thread can lock this mutex until it has been unlocked.
mutex_name must be a symbol or an integer.
Note, every different mutex_name corresponds to a different mutex. Thus, it is easy to create vast numbers of mutexes. However, there is a memory and processor overhead associated with each mutex. Although this is small, creating very large numbers of mutexes is not a good idea.
Mutexes cannot be destroyed.
1942 1943 1944 1945 1946 1947 1948 1949 1950 1951 1952 1953 1954 1955 1956 1957 |
# File 'lib/parallelpipes.rb', line 1942 def lock(mutex_name) ArgumentError.check([:mutex_name, mutex_name, [Symbol, Integer, Fixnum]]) log 'fpv', :lock # $stderr.puts "#@mpn thinks controller is #@controller" @thread_mutexes[mutex_name] ||= Mutex.new @thread_mutexes[mutex_name].lock raise NoController unless @controller raise DeadParallelPipe unless @alive # check_for_exit_signal @old_controllers ||= {} @old_controllers[mutex_name] = @controller i_send(CON, [mutex_name, :lock], tp: @controller) w_recv(mutex_name, fp: @controller) log 'fpvc', :lock end |
#mutex_locked?(mutex_name) ⇒ Boolean
Is the mutex called mutex_name locked? Returns [pipe_no, thread_id] of the locking thread and process if locked, nil if it is not locked.
1998 1999 2000 2001 2002 2003 2004 2005 2006 2007 2008 |
# File 'lib/parallelpipes.rb', line 1998 def mutex_locked?(mutex_name) ArgumentError.check([:mutex_name, mutex_name, [Symbol, Integer, Fixnum]]) log 'fpv', :locked? raise NoController unless @controller raise DeadParallelPipe unless @alive i_send(CON, [mutex_name, :mutex_locked?], tp: @controller) ans = w_recv(mutex_name, fp: @controller)[0] log 'fpvc', :mutex_locked? return ans end |
#put_controller_to_sleep(sleep_time, wait_for_waking) ⇒ Object
If the controller will not be need for a while, put it to sleep for sleep_time seconds. If wait_for_waking is false, wake up automatically. If it is a number, check for a wake up call every wait_for_waiting seconds.
Experimental.
2126 2127 2128 2129 2130 |
# File 'lib/parallelpipes.rb', line 2126 def put_controller_to_sleep(sleep_time, wait_for_waking) raise NoController unless @controller raise DeadParallelPipe unless @alive w_send(:sleep, [sleep_time, wait_for_waking], tp: @controller) end |
#resource_locked?(resource_name) ⇒ Boolean
Is the shared resource called resource_name locked? Returns [pipe_no, thread_id] of the locking thread and process if locked, nil if it is not locked.
2089 2090 2091 2092 2093 2094 2095 2096 2097 2098 2099 |
# File 'lib/parallelpipes.rb', line 2089 def resource_locked?(resource_name) ArgumentError.check([:resource_name, resource_name, [Symbol, Integer, Fixnum]]) log 'fpv', :locked? raise NoController unless @controller raise DeadParallelPipe unless @alive i_send(CON, [resource_name, :resource_locked?], tp: @controller) ans = w_recv(resource_name, fp: @controller)[0] log 'fpvc', :resource_locked? return ans end |
#return_shared_resource(resource_name, resource) ⇒ Object
Return a shared resource.
If the process and thread calling return_shared_resource did not previously call get_shared_resource, this call will hang forever.
savings += 20.03 ppipe.return_shared_resource(:savings, savings)
2053 2054 2055 2056 2057 2058 2059 2060 2061 2062 2063 2064 2065 2066 |
# File 'lib/parallelpipes.rb', line 2053 def return_shared_resource(resource_name, resource) ArgumentError.check([:resource_name, resource_name, [Symbol, Integer, Fixnum]]) log 'fpv', :return_shared_resource # check_messages raise NoController unless @old_resource_controllers[resource_name] raise DeadParallelPipe unless @alive # check_for_exit_signal i_send(CON, [resource_name, resource], tp: @old_resource_controllers[resource_name]) w_recv(resource_name, fp: @old_resource_controllers[resource_name]) @old_resource_controllers.delete(resource_name) @shared_resource_mutexes[resource_name].unlock log 'fpvc', :return_shared_resource end |
#shared_resource(resource_name, &block) ⇒ Object
Fetch a shared resource, edit it and then return it. The resource is passed as the parameter to the block.
ppipe.shared_resource(:savings) do |savings| savings += 102.96 # … some other calculations savings end
NB The shared resource to be returned must be the last line of the block
2078 2079 2080 2081 2082 2083 2084 2085 |
# File 'lib/parallelpipes.rb', line 2078 def shared_resource(resource_name, &block) ArgumentError.check([:resource_name, resource_name, [Symbol, Integer, Fixnum]]) log 'fpv', :shared_resource resource = get_shared_resource(resource_name) resource = yield(resource) return_shared_resource(resource_name, resource) log 'fpvc', :shared_resource end |
#start_controller(controller_refresh = 0.001) ⇒ Object
Start a controller process. Raises an error if there already is one. controller_refresh is the delay between the controller dealing with requests. A shorter refresh time means the controller runs faster, but consumes more CPU (typically a controller consumes about 1-6% of CPU).
1857 1858 1859 1860 1861 1862 1863 1864 1865 1866 1867 1868 1869 1870 1871 1872 1873 1874 1875 1876 1877 1878 |
# File 'lib/parallelpipes.rb', line 1857 def start_controller(controller_refresh=0.001) raise DeadParallelPipe unless @alive raise ControllerError.new("This parallel pipe already has a controller") if @controller parent = [mpn, tid] @controller = fork do log 'v4', 'Started Controller...' @controller_refresh = controller_refresh [tid] ||= {} [tid][CON] = {} @controller_status = {} @resource_status = {} @resources = {} @resource_file_names = {} # @resource_accessed = {} log 'v5', 'commencing controller...' i_send(:controller_started, true, tp: parent[0], tt: parent[1]) run_controller log 'v5', 'controller exiting...' end w_recv(:controller_started, fp: @controller) # broadcast(:set_controller, @controller, {evaluate: true}) end |
#stop_controller ⇒ Object
Stop the controller. The controller sends out an exit warning to all processes. No new lock, synchronize, get_shared_resource or shared_resource resource commands can be issued once that warning has been received. However, the controller will not exit until all existing locks on mutexes have been unlocked and all shared_resources have been returned.
1882 1883 1884 1885 1886 1887 1888 1889 1890 1891 1892 |
# File 'lib/parallelpipes.rb', line 1882 def stop_controller raise NoController unless @controller raise DeadParallelPipe unless @alive controller = @controller w_send(:exit, true, tp: @controller) w_recv(:exited, fp: controller) Process.wait(@pids[controller]) @pids.delete(controller) # $stderr.puts '@controller is', @controller.inspect @controller = nil end |
#synchronize(mutex_name, &block) ⇒ Object
Lock the mutex, call the block and then unlock the mutex. See Controller#lock.
ppipe.synchronize(:sentence)‘a fragmented sen’; sleep rand; print “tencen”
Is identical to:
ppipe.lock(:sentence) print ‘a fragmented sen’; sleep rand; print “tencen” ppipe.unlock(:sentence)
1986 1987 1988 1989 1990 1991 1992 1993 1994 |
# File 'lib/parallelpipes.rb', line 1986 def synchronize(mutex_name, &block) ArgumentError.check([:mutex_name, mutex_name, [Symbol, Integer, Fixnum]]) log 'fpv', :synchronize raise DeadParallelPipe unless @alive lock(mutex_name) yield unlock(mutex_name) log 'fpvc', :synchronize end |
#unlock(mutex_name) ⇒ Object
Unlock a mutex. See Controller#lock
If the process and thread calling unlock did not previously lock the mutex, this call will hang forever.
1963 1964 1965 1966 1967 1968 1969 1970 1971 1972 1973 1974 |
# File 'lib/parallelpipes.rb', line 1963 def unlock(mutex_name) ArgumentError.check([:mutex_name, mutex_name, [Symbol, Integer, Fixnum]]) log 'fpv', :unlock # check_messages raise NoController.new("Controller dead, or a lock call has not been issued with the name #{mutex_name} from this process") unless @old_controllers[mutex_name] raise DeadParallelPipe unless @alive i_send(CON, [mutex_name, :unlock], tp: @old_controllers[mutex_name]) w_recv(mutex_name, fp: @old_controllers[mutex_name]) @old_controllers.delete(mutex_name) @thread_mutexes[mutex_name].unlock log 'fpvc', :unlock end |
#wake_controller ⇒ Object
Wake up the controller
Experimental
2136 2137 2138 2139 2140 |
# File 'lib/parallelpipes.rb', line 2136 def wake_controller raise NoController unless @controller raise DeadParallelPipe unless @alive w_send(:wake, true, tp: @controller) end |