Module: PPipe::Controller

Includes:
Methods
Included in:
PPipe
Defined in:
lib/parallelpipes.rb

Constant Summary collapse

CON =

_GHAkxsadf0a0s98

:controller_switch

Instance Attribute Summary collapse

Attributes included from Methods

#is_root, #mpn, #redirect, #thread_safe, #tp_required, #tt_required

Attributes included from Log

#verbosity

Instance Method Summary collapse

Methods included from Methods

#assigned?, #die, #exit, #finish, #fork, #gets, #i_recv, #i_send, #kill_all, #kill_pipe, #pids, #puts, #read_all, #set_up_pipes, #t_recv, #tid, #user_end, #w_recv, #w_send, #wait, #wait_till_assigned, #waitall

Methods included from Log

clean_up, io=, #log, log_file, log_file=

Methods included from Reader

#configure_reader, #get_line, #read_pipe

Instance Attribute Details

#controllerObject (readonly)

the pipe number of the controller (nil if there is no controller)



1853
1854
1855
# File 'lib/parallelpipes.rb', line 1853

def controller
  @controller
end

Instance Method Details

#auto_load_save(resource_name, file_name) ⇒ Object

Raises:



2110
2111
2112
2113
2114
2115
2116
2117
2118
2119
2120
# File 'lib/parallelpipes.rb', line 2110

def auto_load_save(resource_name, file_name)
  ArgumentError.check([:resource_name, resource_name, [Symbol, Integer, Fixnum]])
  log 'fpv', :auto_load_save
  raise NoController unless @controller
  raise DeadParallelPipe unless @alive
  i_send(:auto_load_save, [resource_name, file_name], tp: @controller)
  reply = w_recv(:auto_load_save_ + resource_name, fp: @controller)
  raise ControllerError.new("if auto_load_save is called it must be before any access to a shared resource (#{resource_name})") if reply == :already_accessed
  raise ControllerError.new("another thread or process has already called auto_load_save for #{resource_name}") if reply == :already_opened
  log 'fpvc', :auto_load_save
end

#controller_alive?Boolean

Is there a controller?

Returns:

  • (Boolean)


2168
2169
2170
2171
# File 'lib/parallelpipes.rb', line 2168

def controller_alive?
  check_messages
  return @controller ? true : false
end

#controller_refresh=(value) ⇒ Object

Change the controller refresh time. controller_refresh is the delay between the controller dealing with requests. A shorter refresh time means the controller runs faster, but consumes more CPU (typically a controller consumes about 1-6% of CPU). Default is 0.001. There is no point having it smaller than 0.0001 - there is no improvement in performance.

Benchmarks (2GHz AMD 64 (32bit OS))

Time taken to lock and unlock a mutex twice

controller_refresh : time taken

  • 0.1: 0.313996553421021

  • 0.01: 0.132316589355469

  • 0.001: 0.0158429145812988

  • 0.0001: 0.00638794898986816

  • 1.0e-05: 0.0312449932098389

Footnote:

Consuming 1-6% of CPU seems like a lot. However, PPipe is aiming to bring the multi-core, multi-process world to Ruby. Most new computers have at least two CPU cores nowadays, many have more and the number will rise in the future. 1-6% of 1 processor is not much when the computer has 4, 8 or 16 processors.

Having said which, if someone finds a better way of doing what the controller does, please share it!

And finally, if this overhead is unacceptable, PPipe run without a Controller (see Methods#set_up_pipes) is still a powerful parallelization tool.

Raises:



1916
1917
1918
1919
1920
# File 'lib/parallelpipes.rb', line 1916

def controller_refresh=(value)
  raise NoController unless @controller
  raise DeadParallelPipe unless @alive
  i_send(:set_controller_refresh, value, tp: @controller, evaluate: true)
end

#controller_woken?(wait_till_woken = false) ⇒ Boolean

Returns true if the controller is awake, false if it is asleep. Currently hangs if the controller has not been told to sleep.

Experimental

Returns:

  • (Boolean)

Raises:



2146
2147
2148
2149
2150
2151
2152
2153
2154
2155
2156
2157
2158
2159
2160
2161
2162
2163
2164
# File 'lib/parallelpipes.rb', line 2146

def controller_woken?(wait_till_woken=false)
  #blocks unless controller has been put to sleep. Returns true immediately if controller is asleep. Returns false immediately if controller has been put to sleep and woken
  raise NoController unless @controller
  raise DeadParallelPipe unless @alive
  check_messages
  while @messages[tid][:controller_sleep_switch][@controller].values[0].size > 2
    2.times{@messages[tid][:controller_sleep_switch][@controller].values[0].shift}
  end
  unless @controller_asleep
    w_recv(:controller_sleep_switch, fp: @controller)
    @controller_asleep = true
  else
    if wait_till_woken
      @controller_asleep = w_recv(:controller_sleep_switch, fp: @controller)
    else
      @controller_asleep = !t_recv(:controller_sleep_switch, fp: @controller)
    end
  end
end

#get_shared_resource(resource_name) ⇒ Object

Get a shared resource from the controller. No other process or thread can access the resource until this process has returned it.

resource_name must be a Symbol or Integer

A shared resource can be any object where

eval(object.inspect) == object

If the resource has not been accessed before it is initialized to nil.

ppipe = PPipe.new(5, true) savings = ppipe.get_shared_resource(:savings)

Raises:



2025
2026
2027
2028
2029
2030
2031
2032
2033
2034
2035
2036
2037
2038
2039
2040
2041
2042
2043
2044
# File 'lib/parallelpipes.rb', line 2025

def get_shared_resource(resource_name)
  ArgumentError.check([:resource_name, resource_name, [Symbol, Integer, Fixnum]])

  log 'fpv', :get_shared_resource
  @shared_resource_mutexes[resource_name] ||= Mutex.new
  @shared_resource_mutexes[resource_name].lock
  check_messages
  raise NoController unless @controller
  raise DeadParallelPipe unless @alive
  @old_resource_controllers ||= {}
  @old_resource_controllers[resource_name] = @controller

#     check_for_exit_signal
  i_send(CON,  [resource_name, :fetch], tp: @controller)
#     $stderr.puts 'request sent'
  ans = w_recv(resource_name, fp: @controller)
#     $stderr.puts 'got answer'
  log 'fpvc', :get_shared_resource
  return ans[0]
end

#lock(mutex_name) ⇒ Object

Lock a mutex named mutex_name. If the mutex did not exist before, create it. No other process or thread can lock this mutex until it has been unlocked.

mutex_name must be a symbol or an integer.

Note, every different mutex_name corresponds to a different mutex. Thus, it is easy to create vast numbers of mutexes. However, there is a memory and processor overhead associated with each mutex. Although this is small, creating very large numbers of mutexes is not a good idea.

Mutexes cannot be destroyed.

Raises:



1942
1943
1944
1945
1946
1947
1948
1949
1950
1951
1952
1953
1954
1955
1956
1957
# File 'lib/parallelpipes.rb', line 1942

def lock(mutex_name)
  ArgumentError.check([:mutex_name, mutex_name, [Symbol, Integer, Fixnum]])
  log 'fpv', :lock
#     $stderr.puts "#@mpn thinks controller is #@controller"
  @thread_mutexes[mutex_name] ||= Mutex.new
  @thread_mutexes[mutex_name].lock
  check_messages
  raise NoController unless @controller
  raise DeadParallelPipe unless @alive
#     check_for_exit_signal
  @old_controllers ||= {}
  @old_controllers[mutex_name] = @controller
  i_send(CON,  [mutex_name, :lock], tp: @controller)
  w_recv(mutex_name, fp: @controller)
  log 'fpvc', :lock
end

#mutex_locked?(mutex_name) ⇒ Boolean

Is the mutex called mutex_name locked? Returns [pipe_no, thread_id] of the locking thread and process if locked, nil if it is not locked.

Returns:

  • (Boolean)

Raises:



1998
1999
2000
2001
2002
2003
2004
2005
2006
2007
2008
# File 'lib/parallelpipes.rb', line 1998

def mutex_locked?(mutex_name)
  ArgumentError.check([:mutex_name, mutex_name, [Symbol, Integer, Fixnum]])
  log 'fpv', :locked?
  check_messages
  raise NoController unless @controller
  raise DeadParallelPipe unless @alive
  i_send(CON,  [mutex_name, :mutex_locked?], tp: @controller)
  ans = w_recv(mutex_name, fp: @controller)[0]
  log 'fpvc', :mutex_locked?
  return ans
end

#put_controller_to_sleep(sleep_time, wait_for_waking) ⇒ Object

If the controller will not be need for a while, put it to sleep for sleep_time seconds. If wait_for_waking is false, wake up automatically. If it is a number, check for a wake up call every wait_for_waiting seconds.

Experimental.

Raises:



2126
2127
2128
2129
2130
# File 'lib/parallelpipes.rb', line 2126

def put_controller_to_sleep(sleep_time, wait_for_waking)
  raise NoController unless @controller
  raise DeadParallelPipe unless @alive
  w_send(:sleep, [sleep_time, wait_for_waking], tp: @controller)
end

#resource_locked?(resource_name) ⇒ Boolean

Is the shared resource called resource_name locked? Returns [pipe_no, thread_id] of the locking thread and process if locked, nil if it is not locked.

Returns:

  • (Boolean)

Raises:



2089
2090
2091
2092
2093
2094
2095
2096
2097
2098
2099
# File 'lib/parallelpipes.rb', line 2089

def resource_locked?(resource_name)
  ArgumentError.check([:resource_name, resource_name, [Symbol, Integer, Fixnum]])
  log 'fpv', :locked?
  check_messages
  raise NoController unless @controller
  raise DeadParallelPipe unless @alive
  i_send(CON,  [resource_name, :resource_locked?], tp: @controller)
  ans = w_recv(resource_name, fp: @controller)[0]
  log 'fpvc', :resource_locked?
  return ans
end

#return_shared_resource(resource_name, resource) ⇒ Object

Return a shared resource.

If the process and thread calling return_shared_resource did not previously call get_shared_resource, this call will hang forever.

savings += 20.03 ppipe.return_shared_resource(:savings, savings)

Raises:



2053
2054
2055
2056
2057
2058
2059
2060
2061
2062
2063
2064
2065
2066
# File 'lib/parallelpipes.rb', line 2053

def return_shared_resource(resource_name, resource)
  ArgumentError.check([:resource_name, resource_name, [Symbol, Integer, Fixnum]])
  log 'fpv', :return_shared_resource

#     check_messages
  raise NoController unless @old_resource_controllers[resource_name]
  raise DeadParallelPipe unless @alive
#     check_for_exit_signal
  i_send(CON,  [resource_name, resource], tp: @old_resource_controllers[resource_name])
  w_recv(resource_name, fp: @old_resource_controllers[resource_name])
  @old_resource_controllers.delete(resource_name)
  @shared_resource_mutexes[resource_name].unlock
  log 'fpvc', :return_shared_resource
end

#shared_resource(resource_name, &block) ⇒ Object

Fetch a shared resource, edit it and then return it. The resource is passed as the parameter to the block.

ppipe.shared_resource(:savings) do |savings| savings += 102.96 # … some other calculations savings end

NB The shared resource to be returned must be the last line of the block



2078
2079
2080
2081
2082
2083
2084
2085
# File 'lib/parallelpipes.rb', line 2078

def shared_resource(resource_name, &block)
  ArgumentError.check([:resource_name, resource_name, [Symbol, Integer, Fixnum]])
  log 'fpv', :shared_resource
  resource = get_shared_resource(resource_name)
  resource = yield(resource)
  return_shared_resource(resource_name, resource)
  log 'fpvc', :shared_resource
end

#start_controller(controller_refresh = 0.001) ⇒ Object

Start a controller process. Raises an error if there already is one. controller_refresh is the delay between the controller dealing with requests. A shorter refresh time means the controller runs faster, but consumes more CPU (typically a controller consumes about 1-6% of CPU).

Raises:



1857
1858
1859
1860
1861
1862
1863
1864
1865
1866
1867
1868
1869
1870
1871
1872
1873
1874
1875
1876
1877
1878
# File 'lib/parallelpipes.rb', line 1857

def start_controller(controller_refresh=0.001)
  raise DeadParallelPipe unless @alive
  raise ControllerError.new("This parallel pipe already has a controller") if @controller
  parent = [mpn, tid]
  @controller = fork do
    log 'v4', 'Started Controller...'
    @controller_refresh = controller_refresh
    @messages[tid] ||= {}
    @messages[tid][CON] = {}
    @controller_status = {}
    @resource_status = {}
    @resources = {}
    @resource_file_names = {}
#       @resource_accessed = {}
    log 'v5', 'commencing controller...'
    i_send(:controller_started, true, tp: parent[0], tt: parent[1])
    run_controller
    log 'v5', 'controller exiting...'
  end
  w_recv(:controller_started, fp: @controller)
#     broadcast(:set_controller, @controller, {evaluate: true})
end

#stop_controllerObject

Stop the controller. The controller sends out an exit warning to all processes. No new lock, synchronize, get_shared_resource or shared_resource resource commands can be issued once that warning has been received. However, the controller will not exit until all existing locks on mutexes have been unlocked and all shared_resources have been returned.

Raises:



1882
1883
1884
1885
1886
1887
1888
1889
1890
1891
1892
# File 'lib/parallelpipes.rb', line 1882

def stop_controller
  raise NoController unless @controller
  raise DeadParallelPipe unless @alive
  controller = @controller
  w_send(:exit, true, tp: @controller)
  w_recv(:exited, fp: controller)
  Process.wait(@pids[controller])
  @pids.delete(controller)
#     $stderr.puts '@controller is', @controller.inspect
  @controller = nil
end

#synchronize(mutex_name, &block) ⇒ Object

Lock the mutex, call the block and then unlock the mutex. See Controller#lock.

ppipe.synchronize(:sentence)‘a fragmented sen’; sleep rand; print “tencen”

Is identical to:

ppipe.lock(:sentence) print ‘a fragmented sen’; sleep rand; print “tencen” ppipe.unlock(:sentence)

Raises:



1986
1987
1988
1989
1990
1991
1992
1993
1994
# File 'lib/parallelpipes.rb', line 1986

def synchronize(mutex_name, &block)
  ArgumentError.check([:mutex_name, mutex_name, [Symbol, Integer, Fixnum]])
  log 'fpv', :synchronize
  raise DeadParallelPipe unless @alive
  lock(mutex_name)
  yield
  unlock(mutex_name)
  log 'fpvc', :synchronize
end

#unlock(mutex_name) ⇒ Object

Unlock a mutex. See Controller#lock

If the process and thread calling unlock did not previously lock the mutex, this call will hang forever.

Raises:



1963
1964
1965
1966
1967
1968
1969
1970
1971
1972
1973
1974
# File 'lib/parallelpipes.rb', line 1963

def unlock(mutex_name)
  ArgumentError.check([:mutex_name, mutex_name, [Symbol, Integer, Fixnum]])
  log 'fpv', :unlock
#     check_messages
  raise NoController.new("Controller dead, or a lock call has not been issued with the name #{mutex_name} from this process") unless @old_controllers[mutex_name]
  raise DeadParallelPipe unless @alive
  i_send(CON,  [mutex_name, :unlock], tp: @old_controllers[mutex_name])
  w_recv(mutex_name, fp: @old_controllers[mutex_name])
  @old_controllers.delete(mutex_name)
  @thread_mutexes[mutex_name].unlock
  log 'fpvc', :unlock
end

#wake_controllerObject

Wake up the controller

Experimental

Raises:



2136
2137
2138
2139
2140
# File 'lib/parallelpipes.rb', line 2136

def wake_controller
  raise NoController unless @controller
  raise DeadParallelPipe unless @alive
  w_send(:wake, true, tp: @controller)
end