Class: AsyncObserver::Queue

Inherits:
Object
  • Object
show all
Defined in:
lib/async_observer/queue.rb

Constant Summary collapse

DEFAULT_PRI =
512
DEFAULT_FUZZ =
0
DEFAULT_DELAY =
0
DEFAULT_TTR =
120
DEFAULT_TUBE =
'default'
SUBMIT_OPTS =
[:pri, :fuzz, :delay, :ttr, :tube]

Class Attribute Summary collapse

Class Method Summary collapse

Class Attribute Details

.after_putObject

Returns the value of attribute after_put.



32
33
34
# File 'lib/async_observer/queue.rb', line 32

def after_put
  @after_put
end

.queueObject

Returns the value of attribute queue.



32
33
34
# File 'lib/async_observer/queue.rb', line 32

def queue
  @queue
end

Class Method Details

.gen(obj, selector, args) ⇒ Object

Be careful not to pass in a selector that’s not valid ruby source.



89
90
91
# File 'lib/async_observer/queue.rb', line 89

def gen(obj, selector, args)
  obj.rrepr + '.' + selector.to_s + '(' + gen_args(args) + ')'
end

.gen_args(args) ⇒ Object



93
94
95
# File 'lib/async_observer/queue.rb', line 93

def gen_args(args)
  args.rrepr[1...-1]
end

.pkg(code, opts) ⇒ Object



84
85
86
# File 'lib/async_observer/queue.rb', line 84

def pkg(code, opts)
  opts.merge(:type => :rails, :code => code)
end

.put!(obj, pri = DEFAULT_PRI, delay = DEFAULT_DELAY, ttr = DEFAULT_TTR, tube = DEFAULT_TUBE) ⇒ Object



49
50
51
52
53
54
55
56
57
58
# File 'lib/async_observer/queue.rb', line 49

def put!(obj, pri=DEFAULT_PRI, delay=DEFAULT_DELAY, ttr=DEFAULT_TTR,
         tube=DEFAULT_TUBE)
  return sync_run(obj) if pri == :direct || !queue
  queue.connect()
  queue.use(tube)
  info = [queue.yput(obj, pri, delay, ttr), queue.last_server]
  f = AsyncObserver::Queue.after_put
  f.call(*info) if f
  return info
end

.put_call!(obj, sel, opts, args = []) ⇒ Object



62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
# File 'lib/async_observer/queue.rb', line 62

def put_call!(obj, sel, opts, args=[])
  pri = opts.fetch(:pri, DEFAULT_PRI)
  fuzz = opts.fetch(:fuzz, DEFAULT_FUZZ)
  delay = opts.fetch(:delay, DEFAULT_DELAY)
  ttr = opts.fetch(:ttr, DEFAULT_TTR)
  tube = opts.fetch(:tube, DEFAULT_TUBE)
  worker_opts = opts.reject{|k,v| SUBMIT_OPTS.include?(k)}
  interpolator = opts.fetch(:interpolator, nil)

  pri = pri + rand(fuzz + 1) if pri != :direct

  if interpolator
    code = packed = interpolator
  else 
    code = gen(obj, sel, args)
    packed = pkg(code, worker_opts)
  end
  
  RAILS_DEFAULT_LOGGER.info("put #{pri} #{code} to #{tube}")
  put!(packed, pri, delay, ttr, tube)
end

.sync_run(obj) ⇒ Object

This runs jobs synchronously; it’s used when no queue is configured.



41
42
43
44
45
46
47
# File 'lib/async_observer/queue.rb', line 41

def sync_run(obj)
  body = YAML.dump(obj)
  job = Beanstalk::Job.new(AsyncObserver::FakeConn.new(), 0, body)
  sync_worker.dispatch(job)
  sync_worker.do_all_work()
  return 0, '0.0.0.0'
end

.sync_workerObject

This is a fake worker instance for running jobs synchronously.



35
36
37
38
# File 'lib/async_observer/queue.rb', line 35

def sync_worker()
  require 'async_observer/worker'
  @sync_worker ||= AsyncObserver::Worker.new(binding)
end