Module: Threaded

Extended by:
Threaded
Included in:
Threaded
Defined in:
lib/threaded.rb,
lib/threaded/errors.rb,
lib/threaded/master.rb,
lib/threaded/worker.rb,
lib/threaded/promise.rb,
lib/threaded/version.rb

Defined Under Namespace

Classes: Master, NoWorkersError, Promise, Worker, WorkerNotStarted

Constant Summary collapse

STOP_TIMEOUT =

seconds

10
VERSION =
"0.0.4"

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#inlineObject Also known as: inline?

Returns the value of attribute inline.



14
15
16
# File 'lib/threaded.rb', line 14

def inline
  @inline
end

#loggerObject

Returns the value of attribute logger.



14
15
16
# File 'lib/threaded.rb', line 14

def logger
  @logger
end

#sizeObject

Returns the value of attribute size.



14
15
16
# File 'lib/threaded.rb', line 14

def size
  @size
end

#sync_promise_ioObject Also known as: sync_promise_io?

Returns the value of attribute sync_promise_io.



14
15
16
# File 'lib/threaded.rb', line 14

def sync_promise_io
  @sync_promise_io
end

Instance Method Details

#configure {|_self| ... } ⇒ Object Also known as: config

Yields:

  • (_self)

Yield Parameters:

  • _self (Threaded)

    the object that the method was called on



55
56
57
58
# File 'lib/threaded.rb', line 55

def configure(&block)
  raise "Queue is already started, must configure queue before starting" if started?
  yield self
end

#enqueue(job, *args) ⇒ Object



81
82
83
84
85
86
87
88
# File 'lib/threaded.rb', line 81

def enqueue(job, *args)
  if inline?
    job.call(*args)
  else
    master.enqueue(job, *args)
  end
  return true
end

#later(&block) ⇒ Object



69
70
71
72
73
74
75
76
77
78
79
# File 'lib/threaded.rb', line 69

def later(&block)
  job = if sync_promise_io?
    Proc.new {
      Thread.current[:stdout] = StringIO.new
      block.call
    }
  else
    block
  end
  Threaded::Promise.new(&job).later
end

#masterObject Also known as: master=



44
45
46
47
48
49
50
51
# File 'lib/threaded.rb', line 44

def master
  @mutex.synchronize do
    return @master if @master
    @master = Master.new(logger: self.logger,
                         size:   self.size)
  end
  @master
end

#start(options = {}) ⇒ Object



35
36
37
38
39
40
41
42
# File 'lib/threaded.rb', line 35

def start(options = {})
  raise "Queue is already started, must configure queue before starting" if options.any? && started?
  options.each do |k, v|
    self.send(k, v)
  end
  self.master.start
  return self
end

#started?Boolean

Returns:

  • (Boolean)


61
62
63
# File 'lib/threaded.rb', line 61

def started?
  !stopped?
end

#stop(timeout = STOP_TIMEOUT) ⇒ Object



90
91
92
93
94
# File 'lib/threaded.rb', line 90

def stop(timeout = STOP_TIMEOUT)
  return true unless master
  master.stop(timeout)
  return true
end

#stopped?Boolean

Returns:

  • (Boolean)


65
66
67
# File 'lib/threaded.rb', line 65

def stopped?
  master.stopping?
end