Module: RSMP::Proxy::Modules::Tasks
- Included in:
- RSMP::Proxy
- Defined in:
- lib/rsmp/proxy/modules/tasks.rb
Overview
Reader and timer task management Handles async tasks for reading from socket and running periodic timers
Instance Method Summary collapse
- #read_line ⇒ Object
- #run_reader ⇒ Object
- #run_timer(task, interval) ⇒ Object
-
#start_reader ⇒ Object
run an async task that reads from @socket.
- #start_timer ⇒ Object
- #timer(now) ⇒ Object
Instance Method Details
#read_line ⇒ Object
36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 |
# File 'lib/rsmp/proxy/modules/tasks.rb', line 36 def read_line json = @protocol.read_line beginning = Time.now = process_packet json duration = Time.now - beginning ms = (duration * 1000).round(4) per_second = if duration.positive? (1.0 / duration).round else Float::INFINITY end if type = .type m_id = Logger.(.m_id) else type = 'Unknown' m_id = nil end str = [type, m_id, "processed in #{ms}ms, #{per_second}req/s"].compact.join(' ') log str, level: :statistics end |
#run_reader ⇒ Object
15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
# File 'lib/rsmp/proxy/modules/tasks.rb', line 15 def run_reader @stream ||= IO::Stream::Buffered.new(@socket) @protocol ||= RSMP::Protocol.new(@stream) # rsmp messages are json terminated with a form-feed loop do read_line end rescue Restart log 'Closing connection', level: :warning raise rescue EOFError, Async::Stop log 'Connection closed', level: :warning rescue IOError => e log "IOError: #{e}", level: :warning rescue Errno::ECONNRESET log 'Connection reset by peer', level: :warning rescue Errno::EPIPE log 'Broken pipe', level: :warning rescue StandardError => e distribute_error e, level: :internal end |
#run_timer(task, interval) ⇒ Object
71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 |
# File 'lib/rsmp/proxy/modules/tasks.rb', line 71 def run_timer(task, interval) next_time = Time.now.to_f loop do begin now = Clock.now timer(now) rescue RSMP::Schema::Error => e log "Timer: Schema error: #{e}", level: :warning rescue EOFError => e log "Timer: Connection closed: #{e}", level: :warning rescue IOError log 'Timer: IOError', level: :warning rescue Errno::ECONNRESET log 'Timer: Connection reset by peer', level: :warning rescue Errno::EPIPE log 'Timer: Broken pipe', level: :warning rescue StandardError => e distribute_error e, level: :internal end ensure next_time += interval duration = next_time - Time.now.to_f task.sleep duration end end |
#start_reader ⇒ Object
run an async task that reads from @socket
8 9 10 11 12 13 |
# File 'lib/rsmp/proxy/modules/tasks.rb', line 8 def start_reader @reader = @task.async do |task| task.annotate 'reader' run_reader end end |
#start_timer ⇒ Object
58 59 60 61 62 63 64 65 66 67 68 69 |
# File 'lib/rsmp/proxy/modules/tasks.rb', line 58 def start_timer return if @timer name = 'timer' interval = @site_settings['intervals']['timer'] || 1 log "Starting #{name} with interval #{interval} seconds", level: :debug @latest_watchdog_received = Clock.now @timer = @task.async do |task| task.annotate 'timer' run_timer task, interval end end |
#timer(now) ⇒ Object
97 98 99 100 101 |
# File 'lib/rsmp/proxy/modules/tasks.rb', line 97 def timer(now) watchdog_send_timer now check_ack_timeout now check_watchdog_timeout now end |