Class: DLogReader::SimpleForked

Inherits:
Object
  • Object
show all
Defined in:
lib/distributed_logreader/distributer/simple_forked_process.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(worker, num_processes = 3, num_threads_per_process = 10) ⇒ SimpleForked

Returns a new instance of SimpleForked.



7
8
9
10
11
12
13
14
15
16
# File 'lib/distributed_logreader/distributer/simple_forked_process.rb', line 7

def initialize(worker, num_processes = 3, num_threads_per_process = 10)
  self.worker = worker
  self.num_threads_per_process = (num_threads_per_process || 10)
  self.queue = Queue.new
  self.processors = []
  num_processes.times do |x|
    $dlog_logger.debug("Forking #{x} process")
    self.processors << create_process
  end
end

Instance Attribute Details

#num_threads_per_processObject

Returns the value of attribute num_threads_per_process.



6
7
8
# File 'lib/distributed_logreader/distributer/simple_forked_process.rb', line 6

def num_threads_per_process
  @num_threads_per_process
end

#processorsObject

Returns the value of attribute processors.



6
7
8
# File 'lib/distributed_logreader/distributer/simple_forked_process.rb', line 6

def processors
  @processors
end

#queueObject

Returns the value of attribute queue.



6
7
8
# File 'lib/distributed_logreader/distributer/simple_forked_process.rb', line 6

def queue
  @queue
end

#thread_poolObject

Returns the value of attribute thread_pool.



6
7
8
# File 'lib/distributed_logreader/distributer/simple_forked_process.rb', line 6

def thread_pool
  @thread_pool
end

#workerObject

Returns the value of attribute worker.



6
7
8
# File 'lib/distributed_logreader/distributer/simple_forked_process.rb', line 6

def worker
  @worker
end

Instance Method Details

#joinObject



22
23
24
25
26
27
28
29
# File 'lib/distributed_logreader/distributer/simple_forked_process.rb', line 22

def join
  num_jobs_outstanding = self.processors.inject(0){|a,b| a + b.num_jobs}
  while(queue.size > 0 || num_jobs_outstanding > 0)
    sleep 0.1
    num_jobs_outstanding = self.processors.inject(0){|a,b| a + b.num_jobs}
    $dlog_logger.debug("Shutting down #{num_jobs_outstanding} left")
  end
end

#process(line) ⇒ Object



18
19
20
# File 'lib/distributed_logreader/distributer/simple_forked_process.rb', line 18

def process(line)
  self.queue << line
end