Class: Magellan::Gcs::Proxy::PubsubSustainer

Inherits:
Object
  • Object
show all
Includes:
Log
Defined in:
lib/magellan/gcs/proxy/pubsub_sustainer.rb

Constant Summary

Constants included from Log

Log::CLOUD_LOGGING_RESOURCE_KEYS

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Log

build_cloud_logging_logger, build_logger, build_loggers, logger, loggers, verbose

Constructor Details

#initialize(message, delay: 10, interval: nil) ⇒ PubsubSustainer

Returns a new instance of PubsubSustainer.



33
34
35
36
37
# File 'lib/magellan/gcs/proxy/pubsub_sustainer.rb', line 33

def initialize(message, delay: 10, interval: nil)
  @message = message
  @delay = delay.to_i
  @interval = (interval || @delay * 0.9).to_f
end

Instance Attribute Details

#delayObject (readonly)

Returns the value of attribute delay.



32
33
34
# File 'lib/magellan/gcs/proxy/pubsub_sustainer.rb', line 32

def delay
  @delay
end

#intervalObject (readonly)

Returns the value of attribute interval.



32
33
34
# File 'lib/magellan/gcs/proxy/pubsub_sustainer.rb', line 32

def interval
  @interval
end

#messageObject (readonly)

Returns the value of attribute message.



32
33
34
# File 'lib/magellan/gcs/proxy/pubsub_sustainer.rb', line 32

def message
  @message
end

#next_deadlineObject (readonly)

Returns the value of attribute next_deadline.



55
56
57
# File 'lib/magellan/gcs/proxy/pubsub_sustainer.rb', line 55

def next_deadline
  @next_deadline
end

#next_limitObject (readonly)

Returns the value of attribute next_limit.



55
56
57
# File 'lib/magellan/gcs/proxy/pubsub_sustainer.rb', line 55

def next_limit
  @next_limit
end

Class Method Details

.run(message) ⇒ Object



13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# File 'lib/magellan/gcs/proxy/pubsub_sustainer.rb', line 13

def run(message)
  raise "#{name}.run requires block" unless block_given?
  if c = Proxy.config[:sustainer]
    t = Thread.new(message, c['delay'], c['interval']) do |msg, delay, interval|
      Thread.current[:processing_message] = true
      new(msg, delay: delay, interval: interval).run
    end
    begin
      yield
    ensure
      t[:processing_message] = false
      t.join
    end
  else
    yield
  end
end

Instance Method Details

#debug(msg) ⇒ Object



75
76
77
# File 'lib/magellan/gcs/proxy/pubsub_sustainer.rb', line 75

def debug(msg)
  logger.debug("#{self.class.name} #{msg}")
end

#reset_next_limitObject



56
57
58
59
60
# File 'lib/magellan/gcs/proxy/pubsub_sustainer.rb', line 56

def reset_next_limit
  now = Time.now.to_f
  @next_limit = now + interval
  @next_deadline = now + delay
end

#runObject



39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# File 'lib/magellan/gcs/proxy/pubsub_sustainer.rb', line 39

def run
  reset_next_limit
  loop do
    debug("is sleeping #{interval} sec.")
    unless wait_while_processing
      debug('is stopping.')
      break
    end
    send_delay
    reset_next_limit
  end
  debug('stopped.')
rescue => e
  logger.error(e)
end

#send_delayObject



62
63
64
65
66
67
68
69
70
71
72
73
# File 'lib/magellan/gcs/proxy/pubsub_sustainer.rb', line 62

def send_delay
  debug("is sending delay!(#{delay})")
  message.delay! delay
  debug("sent delay!(#{delay}) successfully")
rescue Google::Apis::ServerError => e
  if Time.now.to_f < next_deadline
    sleep(1) # retry interval
    debug("is retrying to send delay! cause of [#{e.class.name}] #{e.message}")
    retry
  end
  raise e
end

#wait_while_processingObject



79
80
81
82
83
84
85
# File 'lib/magellan/gcs/proxy/pubsub_sustainer.rb', line 79

def wait_while_processing
  while Time.now.to_f < next_limit
    return false unless Thread.current[:processing_message]
    sleep(0.1)
  end
  true
end