Class: Thimble::Manager

Inherits:
Object
  • Object
show all
Defined in:
lib/Manager.rb

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(max_workers: 6, batch_size: 1000, queue_size: 1000, worker_type: :fork) ⇒ Manager

Returns a new instance of Manager.



4
5
6
7
8
9
10
11
12
13
14
15
# File 'lib/Manager.rb', line 4

def initialize(max_workers: 6,batch_size: 1000, queue_size: 1000, worker_type: :fork)
  raise ArgumentError.new ("worker type must be either :fork or :thread") unless worker_type == :thread || worker_type == :fork
  raise ArgumentError.new ("Your system does not respond to fork please use threads.") unless worker_type == :thread || Process.respond_to?(:fork)
  raise ArgumentError.new ("max_workers must be greater than 0") if max_workers < 1
  raise ArgumentError.new ("batch size must be greater than 0") if batch_size < 1
  @worker_type = worker_type
  @max_workers = max_workers
  @batch_size = batch_size
  @queue_size = queue_size
  @mutex = Mutex.new
  @current_workers = {}
end

Instance Attribute Details

#batch_sizeObject (readonly)

Returns the value of attribute batch_size.



3
4
5
# File 'lib/Manager.rb', line 3

def batch_size
  @batch_size
end

#max_workersObject (readonly)

Returns the value of attribute max_workers.



3
4
5
# File 'lib/Manager.rb', line 3

def max_workers
  @max_workers
end

#queue_sizeObject (readonly)

Returns the value of attribute queue_size.



3
4
5
# File 'lib/Manager.rb', line 3

def queue_size
  @queue_size
end

#worker_typeObject (readonly)

Returns the value of attribute worker_type.



3
4
5
# File 'lib/Manager.rb', line 3

def worker_type
  @worker_type
end

Class Method Details

.deterministicObject



90
91
92
# File 'lib/Manager.rb', line 90

def self.deterministic
  self.new(max_workers: 1, batch_size: 1, queue_size: 1)
end

.smallObject



94
95
96
# File 'lib/Manager.rb', line 94

def self.small
  self.new(max_workers: 1, batch_size: 3, queue_size: 3)
end

Instance Method Details

#current_workers(id) ⇒ Object



41
42
43
44
45
# File 'lib/Manager.rb', line 41

def current_workers(id)
  @mutex.synchronize do
    @current_workers.select { |k,v| v.id == id }
  end
end

#get_fork_worker(batch) ⇒ Object



57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
# File 'lib/Manager.rb', line 57

def get_fork_worker(batch)
  rd, wr = IO.pipe
  tup = OpenStruct.new
  pid = fork do
    Signal.trap("HUP") {exit}
    rd.close 
    t = Marshal.dump(batch.item.map do |item|
      begin
        yield (item.item)
      rescue Exception => e
        e
      end
    end)
    wr.write(t)
    wr.close
  end
  wr.close
  tup.pid = pid
  tup.reader = rd
  tup
end

#get_thread_worker(batch) ⇒ Object



79
80
81
82
83
84
85
86
87
88
# File 'lib/Manager.rb', line 79

def get_thread_worker(batch)
  tup = OpenStruct.new
  tup.pid = Thread.new do
    tup.result = batch.item.map do |item|
      yield item.item
    end
    tup.done = true
  end
  tup
end

#get_worker(batch) ⇒ Object



47
48
49
50
51
52
53
54
55
# File 'lib/Manager.rb', line 47

def get_worker (batch)
  @mutex.synchronize do
    if @worker_type == :fork
      get_fork_worker(batch, &Proc.new)    
    else
      get_thread_worker(batch, &Proc.new)
    end
  end
end

#rem_worker(worker) ⇒ Object



35
36
37
38
39
# File 'lib/Manager.rb', line 35

def rem_worker(worker)
  @mutex.synchronize do
    @current_workers.delete(worker.pid)
  end
end

#sub_worker(worker, id) ⇒ Object



25
26
27
28
29
30
31
32
33
# File 'lib/Manager.rb', line 25

def sub_worker(worker, id)
  raise "Worker must contain a pid!" if worker.pid.nil?
  new_worker = OpenStruct.new
  new_worker.worker = worker
  new_worker.id = id
  @mutex.synchronize do
    @current_workers[worker.pid] = new_worker
  end
end

#worker_available?Boolean

Returns:

  • (Boolean)


17
18
19
# File 'lib/Manager.rb', line 17

def worker_available?
  @current_workers.size < @max_workers
end

#working?Boolean

Returns:

  • (Boolean)


21
22
23
# File 'lib/Manager.rb', line 21

def working?
  @current_workers.size > 0
end