Module: RSMP::Proxy::Modules::Tasks

Included in:
RSMP::Proxy
Defined in:
lib/rsmp/proxy/modules/tasks.rb

Overview

Reader and timer task management Handles async tasks for reading from socket and running periodic timers

Instance Method Summary collapse

Instance Method Details

#read_lineObject



36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
# File 'lib/rsmp/proxy/modules/tasks.rb', line 36

def read_line
  json = @protocol.read_line
  beginning = Time.now
  message = process_packet json
  duration = Time.now - beginning
  ms = (duration * 1000).round(4)
  per_second = if duration.positive?
                 (1.0 / duration).round
               else
                 Float::INFINITY
               end
  if message
    type = message.type
    m_id = Logger.shorten_message_id(message.m_id)
  else
    type = 'Unknown'
    m_id = nil
  end
  str = [type, m_id, "processed in #{ms}ms, #{per_second}req/s"].compact.join(' ')
  log str, level: :statistics
end

#run_readerObject



15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# File 'lib/rsmp/proxy/modules/tasks.rb', line 15

def run_reader
  @stream ||= IO::Stream::Buffered.new(@socket)
  @protocol ||= RSMP::Protocol.new(@stream) # rsmp messages are json terminated with a form-feed
  loop do
    read_line
  end
rescue Restart
  log 'Closing connection', level: :warning
  raise
rescue EOFError, Async::Stop
  log 'Connection closed', level: :warning
rescue IOError => e
  log "IOError: #{e}", level: :warning
rescue Errno::ECONNRESET
  log 'Connection reset by peer', level: :warning
rescue Errno::EPIPE
  log 'Broken pipe', level: :warning
rescue StandardError => e
  distribute_error e, level: :internal
end

#run_timer(task, interval) ⇒ Object



71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
# File 'lib/rsmp/proxy/modules/tasks.rb', line 71

def run_timer(task, interval)
  next_time = Time.now.to_f
  loop do
    begin
      now = Clock.now
      timer(now)
    rescue RSMP::Schema::Error => e
      log "Timer: Schema error: #{e}", level: :warning
    rescue EOFError => e
      log "Timer: Connection closed: #{e}", level: :warning
    rescue IOError
      log 'Timer: IOError', level: :warning
    rescue Errno::ECONNRESET
      log 'Timer: Connection reset by peer', level: :warning
    rescue Errno::EPIPE
      log 'Timer: Broken pipe', level: :warning
    rescue StandardError => e
      distribute_error e, level: :internal
    end
  ensure
    next_time += interval
    duration = next_time - Time.now.to_f
    task.sleep duration
  end
end

#start_readerObject

run an async task that reads from @socket



8
9
10
11
12
13
# File 'lib/rsmp/proxy/modules/tasks.rb', line 8

def start_reader
  @reader = @task.async do |task|
    task.annotate 'reader'
    run_reader
  end
end

#start_timerObject



58
59
60
61
62
63
64
65
66
67
68
69
# File 'lib/rsmp/proxy/modules/tasks.rb', line 58

def start_timer
  return if @timer

  name = 'timer'
  interval = @site_settings['intervals']['timer'] || 1
  log "Starting #{name} with interval #{interval} seconds", level: :debug
  @latest_watchdog_received = Clock.now
  @timer = @task.async do |task|
    task.annotate 'timer'
    run_timer task, interval
  end
end

#timer(now) ⇒ Object



97
98
99
100
101
# File 'lib/rsmp/proxy/modules/tasks.rb', line 97

def timer(now)
  watchdog_send_timer now
  check_ack_timeout now
  check_watchdog_timeout now
end