Class: Forkinator
- Inherits:
-
Object
- Object
- Forkinator
- Defined in:
- lib/forkinator.rb
Overview
The Forkinator makes it easy to fork workers, pass a list of jobs for them to work on, and listen for the results back from the child process. It uses a combination of threading and forking to accomplish this. Marshal is used to pass objects back and forth between the child and parent via IO.pipe.
Class Method Summary collapse
-
.hybrid_fork(qty, jobs, parent_proc, child_proc) ⇒ Object
Forks children, makes threads for two-way communication, and evenly distributes jobs to each child.
-
.make_child(child_proc) ⇒ Hash
Fork a child.
-
.wait_for_threads(threads) ⇒ Object
Wait for threads.
Class Method Details
.hybrid_fork(qty, jobs, parent_proc, child_proc) ⇒ Object
Forks children, makes threads for two-way communication, and evenly distributes jobs to each child.
75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 |
# File 'lib/forkinator.rb', line 75 def self.hybrid_fork(qty, jobs, parent_proc, child_proc) threads = [] #mutex is used to ensure that some operations in the threads don't have the potential of happening at the same time #in another thread semaphore = Mutex.new require('thread') #split the jobs up jobs = jobs.in_groups(qty) #spawn the children children = [] qty.times { children << make_child(child_proc)} #For each worker qty.times do |i| #start a thread threads[i] = Thread.new do Thread.current.abort_on_exception = true child = {} semaphore.synchronize { child = children.pop } pid = child[:pid] njobs = jobs[i - 1] #pass jobs to child Marshal.dump(njobs, child[:write]) #wait for result result = Marshal.load(child[:read]) #process result semaphore.synchronize { parent_proc.call(result) } #close the pipe child[:write].close #wait for process to finish before terminating this thread Process.wait(pid) #close db connection SqliteActiveRecord.connection.close end end wait_for_threads(threads) end |
.make_child(child_proc) ⇒ Hash
Fork a child. Provide a proc of code to run inside child. The child proc expects to be sent an array of jobs
21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 |
# File 'lib/forkinator.rb', line 21 def self.make_child(child_proc) #open pipes for two way communication between the parent and child child_read, parent_write = IO.pipe parent_read, child_write = IO.pipe #remove our database connection, we don't want it inside the child, as it'll get closed when the child shuts down mog_config = ActiveRecord::Base.remove_connection #fork, code inside this block is only ran inside the child pid = Process.fork do begin #Since we're the child now, we'll close the parent's r/w pipes as we don't need them parent_write.close parent_read.close #child loops through IO pipe, listening for data from the parent, if the parent closes the pipe then we're #done while !child_read.eof? do #rename the process to make it clear that it's a worker in idle status $0 = "mogbak [idle]" #this call blocks until it receives something from the parent via the pipe job = Marshal.load(child_read) #since we're working now we'll rename the process $0 = "mogbak [working]" #call the child proc result = child_proc.call(job) #hand the child proc response back to the parent Marshal.dump(result, child_write) end #no matter what happens..make sure we get the pipes closed ensure child_read.close child_write.close end end #This is the parent executing this -- reconnect to the database we just dropped above. ActiveRecord::Base.establish_connection(mog_config) #close the child's handle on the pipes since the parent won't need them child_read.close child_write.close {:write => parent_write, :read => parent_read, :pid => pid} end |
.wait_for_threads(threads) ⇒ Object
Wait for threads
8 9 10 11 12 13 14 15 16 |
# File 'lib/forkinator.rb', line 8 def self.wait_for_threads(threads) threads.compact.each do |t| begin t.join rescue Interrupt # no reason to wait on dead threads end end end |