Class: AsyncIO::Base

Inherits:
Object
  • Object
show all
Includes:
Rescuer
Defined in:
lib/async_io/base.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Rescuer

#rescuer

Constructor Details

#initialize(n_threads = 5, args = { logger: AsyncIO::Logger, queue: Queue.new }) ⇒ Base

Returns a new instance of Base.



22
23
24
25
26
27
# File 'lib/async_io/base.rb', line 22

def initialize(n_threads = 5, args = { logger: AsyncIO::Logger, queue: Queue.new })
  @logger   = args[:logger]
  @queue    = args[:queue]
  @threads  = []
  n_threads.times { @threads << Thread.new { consumer } }
end

Instance Attribute Details

#loggerObject

Returns the value of attribute logger.



21
22
23
# File 'lib/async_io/base.rb', line 21

def logger
  @logger
end

#queueObject (readonly)

Default: Number of threads to be spanwed is 5

NOTE:

Whenever an exception is raised, the thread that the exception was raised from is killed, so we need a way to prevent threads from being killed. Therefore it rescues all exceptions raised and logs them.



20
21
22
# File 'lib/async_io/base.rb', line 20

def queue
  @queue
end

#threadsObject (readonly)

Default: Number of threads to be spanwed is 5

NOTE:

Whenever an exception is raised, the thread that the exception was raised from is killed, so we need a way to prevent threads from being killed. Therefore it rescues all exceptions raised and logs them.



20
21
22
# File 'lib/async_io/base.rb', line 20

def threads
  @threads
end

Instance Method Details

#async(&payload) ⇒ Object

Perform any sort of task that needs to be asynchronously done. NOTE: It does not return anything, as it receives and empty job. ( i.e empty block of code )



90
91
92
# File 'lib/async_io/base.rb', line 90

def async(&payload)
  worker(payload, proc {})
end

#async_with(job) ⇒ Object



94
95
96
# File 'lib/async_io/base.rb', line 94

def async_with(job)
  worker(proc {}, job)
end

#clear_interval!Object



116
117
118
119
# File 'lib/async_io/base.rb', line 116

def clear_interval!
  @interval.terminate
  @interval = nil
end

#interval(seconds) ⇒ Object

TODO: Allow multiple intervals to run on the same thread by storing them in a list, and calling them later on.



103
104
105
106
107
108
109
110
# File 'lib/async_io/base.rb', line 103

def interval(seconds)
  new_interval? do
    while true
      rescuer { yield }
      sleep(seconds)
    end
  end
end

#new_interval?Boolean

Returns:



112
113
114
# File 'lib/async_io/base.rb', line 112

def new_interval?
  @interval ||= Thread.new { yield }
end

#worker(payload, job) ⇒ Object

It creates a new Worker, pushes it onto the queue, whenever a ‘job’ (i.e a Ruby object ) is finished it calls the payload and passes the result of job to it.

For example:

def aget_user(uid, &payload)

worker(payload) do
  User.find(ui)
end

end

It returns the worker created for a particular job which you could send message done to it in order to retrieve its job done. see prediction_io/worker.rb

For example: result = aget_user(1) { |u| Logger.info(u.name) }

# job may take a while to be done…

user = result.done user.name

> “John”

NOTE: Whenever you use the snippet above, if the job has not been finished yet you will get false whenever you send a message job to it. Once job is finished you will be able to get its result.



76
77
78
79
80
81
82
# File 'lib/async_io/base.rb', line 76

def worker(payload, job)
  rescuer do
    Worker.new(payload, job).tap { |w|
      queue.push(w)
    }
  end
end