Class: AsyncIO::Base

Inherits:
Object
  • Object
show all
Includes:
Rescuer
Defined in:
lib/async_io/base.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Rescuer

#rescuer

Constructor Details

#initialize(n_threads = 1) ⇒ Base



24
25
26
27
28
29
# File 'lib/async_io/base.rb', line 24

def initialize(n_threads=1)
  @logger   = AsyncIO::Logger
  @queue    = Queue.new
  @threads  = []
  n_threads.times { @threads << Thread.new { consumer } }
end

Instance Attribute Details

#loggerObject

Returns the value of attribute logger.



23
24
25
# File 'lib/async_io/base.rb', line 23

def logger
  @logger
end

#queueObject (readonly)

Default: Number of threads to be spanwed is 1

NOTE: Any sort of exception raised while ‘getting’ a job done will not be raised at all. Instead it will be logged to a specified log file.

Whenever an exception is raised, the thread that the exception was raised from is killed, so we need a way to prevent threads from being killed. Therefore it rescues all exceptions raised and logs them.



22
23
24
# File 'lib/async_io/base.rb', line 22

def queue
  @queue
end

#threadsObject (readonly)

Default: Number of threads to be spanwed is 1

NOTE: Any sort of exception raised while ‘getting’ a job done will not be raised at all. Instead it will be logged to a specified log file.

Whenever an exception is raised, the thread that the exception was raised from is killed, so we need a way to prevent threads from being killed. Therefore it rescues all exceptions raised and logs them.



22
23
24
# File 'lib/async_io/base.rb', line 22

def threads
  @threads
end

Instance Method Details

#async(&payload) ⇒ Object

Perform any sort of task that needs to be asynchronously done. NOTE: It does not return anything, as it receives and empty job. ( i.e empty block of code )



92
93
94
# File 'lib/async_io/base.rb', line 92

def async(&payload)
  worker(payload) { }
end

#clear_interval!Object



109
110
111
112
# File 'lib/async_io/base.rb', line 109

def clear_interval!
  @interval.terminate
  @interval = nil
end

#interval(seconds) ⇒ Object



96
97
98
99
100
101
102
103
# File 'lib/async_io/base.rb', line 96

def interval(seconds)
  new_interval? do
    while true
      rescuer { yield }
      sleep(seconds)
    end
  end
end

#new_interval?Boolean



105
106
107
# File 'lib/async_io/base.rb', line 105

def new_interval?
  @interval ||= Thread.new { yield }
end

#worker(payload, &job) ⇒ Object

It creates a new Worker, pushes it onto the queue, whenever a ‘job’ (i.e a Ruby object ) is finished it calls the payload and passes the result of job to it.

For example:

def aget_user(uid, &payload)

worker(payload) do
  User.find(ui)
end

end

It returns the worker created for a particular job which you could send message done to it in order to retrieve its job done. see prediction_io/worker.rb

For example: result = aget_user(1) { |u| Logger.info(u.name) }

# job may take a while to be done…

user = result.done user.name

> “John”

NOTE: Whenever you use the snippet above, if the job has not been finished yet you will get false whenever you send a message job to it. Once job is finished you will be able to get its result.



78
79
80
81
82
83
84
# File 'lib/async_io/base.rb', line 78

def worker(payload, &job)
  rescuer do
    Worker.new(payload, job).tap { |w|
      queue.push(w)
    }
  end
end