Class: AsyncIO::Base
Instance Attribute Summary collapse
-
#logger ⇒ Object
Returns the value of attribute logger.
-
#queue ⇒ Object
readonly
Default: Number of threads to be spanwed is 5.
-
#threads ⇒ Object
readonly
Default: Number of threads to be spanwed is 5.
Instance Method Summary collapse
-
#async(&payload) ⇒ Object
Perform any sort of task that needs to be asynchronously done.
- #async_with(job) ⇒ Object
- #clear_interval! ⇒ Object
-
#initialize(n_threads = 5, args = { logger: AsyncIO::Logger, queue: Queue.new }) ⇒ Base
constructor
A new instance of Base.
-
#interval(seconds) ⇒ Object
TODO: Allow multiple intervals to run on the same thread by storing them in a list, and calling them later on.
- #new_interval? ⇒ Boolean
-
#worker(payload, job) ⇒ Object
It creates a new Worker, pushes it onto the queue, whenever a ‘job’ (i.e a Ruby object ) is finished it calls the payload and passes the result of job to it.
Methods included from Rescuer
Constructor Details
#initialize(n_threads = 5, args = { logger: AsyncIO::Logger, queue: Queue.new }) ⇒ Base
Returns a new instance of Base.
22 23 24 25 26 27 |
# File 'lib/async_io/base.rb', line 22 def initialize(n_threads = 5, args = { logger: AsyncIO::Logger, queue: Queue.new }) @logger = args[:logger] @queue = args[:queue] @threads = [] n_threads.times { @threads << Thread.new { consumer } } end |
Instance Attribute Details
#logger ⇒ Object
Returns the value of attribute logger.
21 22 23 |
# File 'lib/async_io/base.rb', line 21 def logger @logger end |
#queue ⇒ Object (readonly)
Default: Number of threads to be spanwed is 5
NOTE:
Whenever an exception is raised, the thread that the exception was raised from is killed, so we need a way to prevent threads from being killed. Therefore it rescues all exceptions raised and logs them.
20 21 22 |
# File 'lib/async_io/base.rb', line 20 def queue @queue end |
#threads ⇒ Object (readonly)
Default: Number of threads to be spanwed is 5
NOTE:
Whenever an exception is raised, the thread that the exception was raised from is killed, so we need a way to prevent threads from being killed. Therefore it rescues all exceptions raised and logs them.
20 21 22 |
# File 'lib/async_io/base.rb', line 20 def threads @threads end |
Instance Method Details
#async(&payload) ⇒ Object
Perform any sort of task that needs to be asynchronously done. NOTE: It does not return anything, as it receives and empty job. ( i.e empty block of code )
90 91 92 |
# File 'lib/async_io/base.rb', line 90 def async(&payload) worker(payload, proc {}) end |
#async_with(job) ⇒ Object
94 95 96 |
# File 'lib/async_io/base.rb', line 94 def async_with(job) worker(proc {}, job) end |
#clear_interval! ⇒ Object
116 117 118 119 |
# File 'lib/async_io/base.rb', line 116 def clear_interval! @interval.terminate @interval = nil end |
#interval(seconds) ⇒ Object
TODO: Allow multiple intervals to run on the same thread by storing them in a list, and calling them later on.
103 104 105 106 107 108 109 110 |
# File 'lib/async_io/base.rb', line 103 def interval(seconds) new_interval? do while true rescuer { yield } sleep(seconds) end end end |
#new_interval? ⇒ Boolean
112 113 114 |
# File 'lib/async_io/base.rb', line 112 def new_interval? @interval ||= Thread.new { yield } end |
#worker(payload, job) ⇒ Object
It creates a new Worker, pushes it onto the queue, whenever a ‘job’ (i.e a Ruby object ) is finished it calls the payload and passes the result of job to it.
For example:
def aget_user(uid, &payload)
worker(payload) do
User.find(ui)
end
end
It returns the worker created for a particular job which you could send message done to it in order to retrieve its job done. see prediction_io/worker.rb
For example: result = aget_user(1) { |u| Logger.info(u.name) }
# job may take a while to be done…
user = result.done user.name
> “John”
NOTE: Whenever you use the snippet above, if the job has not been finished yet you will get false whenever you send a message job to it. Once job is finished you will be able to get its result.
76 77 78 79 80 81 82 |
# File 'lib/async_io/base.rb', line 76 def worker(payload, job) rescuer do Worker.new(payload, job).tap { |w| queue.push(w) } end end |