Class: AsyncIO::Base
Instance Attribute Summary collapse
-
#logger ⇒ Object
Returns the value of attribute logger.
-
#queue ⇒ Object
readonly
Default: Number of threads to be spanwed is 5.
-
#threads ⇒ Object
readonly
Default: Number of threads to be spanwed is 5.
Instance Method Summary collapse
-
#async(&payload) ⇒ Object
Perform any sort of task that needs to be asynchronously done.
- #async_with(task) ⇒ Object
- #clear_interval! ⇒ Object
-
#initialize(n_threads = 5, args = { logger: AsyncIO::Logger, queue: Queue.new }) ⇒ Base
constructor
A new instance of Base.
-
#interval(seconds) ⇒ Object
TODO: Allow multiple intervals to run on the same thread by storing them in a list, and calling them later on.
- #new_interval? ⇒ Boolean
-
#worker(payload, task) ⇒ Object
It creates a new Worker, pushes it onto the queue, whenever a ‘task’ (i.e a Ruby object ) is finished it calls the payload and passes the result of that task to it.
Methods included from Rescuer
Constructor Details
#initialize(n_threads = 5, args = { logger: AsyncIO::Logger, queue: Queue.new }) ⇒ Base
Returns a new instance of Base.
22 23 24 25 26 27 |
# File 'lib/async_io/base.rb', line 22 def initialize(n_threads = 5, args = { logger: AsyncIO::Logger, queue: Queue.new }) @logger = args[:logger] @queue = args[:queue] @threads = [] n_threads.times { @threads << Thread.new { consumer } } end |
Instance Attribute Details
#logger ⇒ Object
Returns the value of attribute logger.
21 22 23 |
# File 'lib/async_io/base.rb', line 21 def logger @logger end |
#queue ⇒ Object (readonly)
Default: Number of threads to be spanwed is 5
NOTE:
Whenever an exception is raised, the thread that the exception was raised from is killed, so we need a way to prevent threads from being killed. Therefore it rescues all exceptions raised and logs them.
20 21 22 |
# File 'lib/async_io/base.rb', line 20 def queue @queue end |
#threads ⇒ Object (readonly)
Default: Number of threads to be spanwed is 5
NOTE:
Whenever an exception is raised, the thread that the exception was raised from is killed, so we need a way to prevent threads from being killed. Therefore it rescues all exceptions raised and logs them.
20 21 22 |
# File 'lib/async_io/base.rb', line 20 def threads @threads end |
Instance Method Details
#async(&payload) ⇒ Object
Perform any sort of task that needs to be asynchronously done. NOTE: It does not return anything, as it receives and empty task. ( i.e empty block of code )
86 87 88 |
# File 'lib/async_io/base.rb', line 86 def async(&payload) worker(payload, proc {}) end |
#async_with(task) ⇒ Object
90 91 92 |
# File 'lib/async_io/base.rb', line 90 def async_with(task) worker(proc {}, task) end |
#clear_interval! ⇒ Object
112 113 114 115 |
# File 'lib/async_io/base.rb', line 112 def clear_interval! @interval.terminate @interval = nil end |
#interval(seconds) ⇒ Object
TODO: Allow multiple intervals to run on the same thread by storing them in a list, and calling them later on.
99 100 101 102 103 104 105 106 |
# File 'lib/async_io/base.rb', line 99 def interval(seconds) new_interval? do while true rescuer { yield } sleep(seconds) end end end |
#new_interval? ⇒ Boolean
108 109 110 |
# File 'lib/async_io/base.rb', line 108 def new_interval? @interval ||= Thread.new { yield } end |
#worker(payload, task) ⇒ Object
It creates a new Worker, pushes it onto the queue, whenever a ‘task’ (i.e a Ruby object ) is finished it calls the payload and passes the result of that task to it.
For example:
def aget_user(uid, &payload)
worker(payload) do
User.find(ui)
end
end
It returns the worker created for this particular task which you could send message done
to it in order to retrieve its completed task. see async_io/worker.rb
For example: result = aget_user(1) { |u| Logger.info(u.name) }
# task may take a while to be done…
user = result.done user.name
> “John”
NOTE: Whenever you use the snippet above, if the task has not been finished yet you will get false
whenever you send a message task
to it. Once task is finished you will be able to get its result.
76 77 78 |
# File 'lib/async_io/base.rb', line 76 def worker(payload, task) Worker.new(payload, task).tap { |w| queue.push(w) } end |