Class: AsyncIO::Base

Inherits:
Object
  • Object
show all
Includes:
Rescuer
Defined in:
lib/async_io/base.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Rescuer

#rescuer

Constructor Details

#initialize(n_threads = 5, args = { logger: AsyncIO::Logger, queue: Queue.new }) ⇒ Base

Returns a new instance of Base.



22
23
24
25
26
27
# File 'lib/async_io/base.rb', line 22

def initialize(n_threads = 5, args = { logger: AsyncIO::Logger, queue: Queue.new })
  @logger   = args[:logger]
  @queue    = args[:queue]
  @threads  = []
  n_threads.times { @threads << Thread.new { consumer } }
end

Instance Attribute Details

#loggerObject

Returns the value of attribute logger.



21
22
23
# File 'lib/async_io/base.rb', line 21

def logger
  @logger
end

#queueObject (readonly)

Default: Number of threads to be spanwed is 5

NOTE:

Whenever an exception is raised, the thread that the exception was raised from is killed, so we need a way to prevent threads from being killed. Therefore it rescues all exceptions raised and logs them.



20
21
22
# File 'lib/async_io/base.rb', line 20

def queue
  @queue
end

#threadsObject (readonly)

Default: Number of threads to be spanwed is 5

NOTE:

Whenever an exception is raised, the thread that the exception was raised from is killed, so we need a way to prevent threads from being killed. Therefore it rescues all exceptions raised and logs them.



20
21
22
# File 'lib/async_io/base.rb', line 20

def threads
  @threads
end

Instance Method Details

#async(&payload) ⇒ Object

Perform any sort of task that needs to be asynchronously done. NOTE: It does not return anything, as it receives and empty task. ( i.e empty block of code )



86
87
88
# File 'lib/async_io/base.rb', line 86

def async(&payload)
  worker(payload, proc {})
end

#async_with(task) ⇒ Object



90
91
92
# File 'lib/async_io/base.rb', line 90

def async_with(task)
  worker(proc {}, task)
end

#clear_interval!Object



112
113
114
115
# File 'lib/async_io/base.rb', line 112

def clear_interval!
  @interval.terminate
  @interval = nil
end

#interval(seconds) ⇒ Object

TODO: Allow multiple intervals to run on the same thread by storing them in a list, and calling them later on.



99
100
101
102
103
104
105
106
# File 'lib/async_io/base.rb', line 99

def interval(seconds)
  new_interval? do
    while true
      rescuer { yield }
      sleep(seconds)
    end
  end
end

#new_interval?Boolean

Returns:

  • (Boolean)


108
109
110
# File 'lib/async_io/base.rb', line 108

def new_interval?
  @interval ||= Thread.new { yield }
end

#worker(payload, task) ⇒ Object

It creates a new Worker, pushes it onto the queue, whenever a ‘task’ (i.e a Ruby object ) is finished it calls the payload and passes the result of that task to it.

For example:

def aget_user(uid, &payload)

worker(payload) do
  User.find(ui)
end

end

It returns the worker created for this particular task which you could send message done to it in order to retrieve its completed task. see async_io/worker.rb

For example: result = aget_user(1) { |u| Logger.info(u.name) }

# task may take a while to be done…

user = result.done user.name

> “John”

NOTE: Whenever you use the snippet above, if the task has not been finished yet you will get false whenever you send a message task to it. Once task is finished you will be able to get its result.



76
77
78
# File 'lib/async_io/base.rb', line 76

def worker(payload, task)
  Worker.new(payload, task).tap { |w| queue.push(w) }
end