Class: RubyRabbitmqJanus::Janus::Concurrencies::KeepaliveThread

Inherits:
Thread
  • Object
show all
Defined in:
lib/rrj/janus/processus/keepalive/keepalive_thread.rb

Overview

Object thread for manage keep a live with Janus Instance

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(instance, rabbit, &block) ⇒ KeepaliveThread

Returns a new instance of KeepaliveThread.



20
21
22
23
24
25
26
27
28
29
30
# File 'lib/rrj/janus/processus/keepalive/keepalive_thread.rb', line 20

def initialize(instance, rabbit, &block)
  @publisher = @session = nil
  @rabbit = rabbit
  @timer = KeepaliveTimer.new
  @message = KeepaliveMessage.new(instance)
  instance.set(thread: __id__)
  Tools::Log.instance.info "Keepalive thread id is #{__id__}"
  super(&block)
rescue
  raise Errors::Janus::KeepaliveThread::Initializer
end

Instance Attribute Details

#sessionInteger (readonly)

Returns Number to session linked to Janus Instance.

Returns:

  • (Integer)

    Number to session linked to Janus Instance



17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
# File 'lib/rrj/janus/processus/keepalive/keepalive_thread.rb', line 17

class KeepaliveThread < Thread
  attr_reader :timer, :session

  def initialize(instance, rabbit, &block)
    @publisher = @session = nil
    @rabbit = rabbit
    @timer = KeepaliveTimer.new
    @message = KeepaliveMessage.new(instance)
    instance.set(thread: __id__)
    Tools::Log.instance.info "Keepalive thread id is #{__id__}"
    super(&block)
  rescue
    raise Errors::Janus::KeepaliveThread::Initializer
  end

  # Initialize a transaction with Janus Instance.
  # Create a session and save response
  def initialize_janus_session
    @publisher = publisher
    @session = response_session
  rescue
    raise Errors::Janus::KeepaliveThread::InitializeJanusSession
  end

  # Restart session
  # :reek:TooManyStatements
  def restart_session
    Tools::Log.instance.warn 'Restart session ...'
    janus = find_model
    if janus.present?
      send_messages_restart
      janus.set(session: @session)
    else
      Tools::Log.instance.error 'Janus Instance Model is gone, giving up'
    end
  rescue
    raise Errors::Janus::KeepaliveThread::RestartSession
  end

  # Start a timer for TTL
  # rubocop:disable Metrics/AbcSize
  # :reek:TooManyStatements
  def start
    @timer.loop_keepalive do
      if detached?(find_model)
        Tools::Log.instance.info \
          "Thread #{__id__} no longer attached to Janus Instance, " \
          'exiting...'
        @timer.stop_timer
        cleanup
        exit
      else
        Tools::Log.instance.info "Thread #{__id__} " \
          'sending keepalive to instance ' \
          "#{@message.instance} with TTL #{@timer.time_to_live}"
        response_keepalive
      end
    end
  rescue
    raise Errors::Janus::KeepaliveThread::Start
  end
  # rubocop:enable Metrics/AbcSize

  # Kill session and disable instance
  def kill
    cleanup
    super
  rescue
    raise Errors::Janus::KeepaliveThread::Kill
  end

  # :reek:TooManyStatements
  def instance_is_down
    janus = find_model
    @session = @message = nil
    if detached?(janus)
      Tools::Log.instance.error\
        "Thread [#{__id__}] no longer attached to Janus Instance " \
        '(should be dead).'
    else
      janus.set(enable: false).unset(I[thread session])
      Tools::Log.instance.fatal \
        "Janus Instance [#{janus.instance}] is down, " \
        "thread [#{__id__}] will die."
    end
  rescue
    raise Errors::Janus::KeepaliveThread::InstanceIsDown
  end

  private

  attr_reader :instance

  def send_messages_restart
    @session = response_session
    response_keepalive
  end

  def cleanup
    response_destroy if @session.present? && @message.present?
    @rabbit.close
  end

  def find_model
    if @message.present?
      Models::JanusInstance.find(@message.instance)
    else
      Tools::Log.instance.warn \
        "Lookup Janus Instance model by session [#{@session}]"
      Models::JanusInstance.find_by_session(@session)
    end
  rescue StandardError => e
    Tools::Log.instance.warn \
      "find_model: rescuing from error #{e.message}"
    nil
  end

  # :reek:FeatureEnvy
  def detached?(janus)
    janus.blank? || janus.thread != __id__
  end

  def publisher
    Rabbit::Publisher::PublishKeepalive.new(@rabbit.channel)
  end

  def response_session
    @message.response_session(publish(@message.session))
  end

  def response_keepalive
    keepalive = @message.keepalive(@session)
    @message.response_acknowledgement(publish(keepalive))
  end

  def response_destroy
    destroy = @message.destroy(@session)
    @message.response_destroy(publish(destroy))
  end

  def publish(message)
    @publisher.publish(message)
  end
end

#timerObject (readonly)

Returns the value of attribute timer.



18
19
20
# File 'lib/rrj/janus/processus/keepalive/keepalive_thread.rb', line 18

def timer
  @timer
end

Instance Method Details

#initialize_janus_sessionObject

Initialize a transaction with Janus Instance. Create a session and save response



34
35
36
37
38
39
# File 'lib/rrj/janus/processus/keepalive/keepalive_thread.rb', line 34

def initialize_janus_session
  @publisher = publisher
  @session = response_session
rescue
  raise Errors::Janus::KeepaliveThread::InitializeJanusSession
end

#instance_is_downObject

:reek:TooManyStatements



89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
# File 'lib/rrj/janus/processus/keepalive/keepalive_thread.rb', line 89

def instance_is_down
  janus = find_model
  @session = @message = nil
  if detached?(janus)
    Tools::Log.instance.error\
      "Thread [#{__id__}] no longer attached to Janus Instance " \
      '(should be dead).'
  else
    janus.set(enable: false).unset(I[thread session])
    Tools::Log.instance.fatal \
      "Janus Instance [#{janus.instance}] is down, " \
      "thread [#{__id__}] will die."
  end
rescue
  raise Errors::Janus::KeepaliveThread::InstanceIsDown
end

#killObject

Kill session and disable instance



81
82
83
84
85
86
# File 'lib/rrj/janus/processus/keepalive/keepalive_thread.rb', line 81

def kill
  cleanup
  super
rescue
  raise Errors::Janus::KeepaliveThread::Kill
end

#restart_sessionObject

Restart session :reek:TooManyStatements



43
44
45
46
47
48
49
50
51
52
53
54
# File 'lib/rrj/janus/processus/keepalive/keepalive_thread.rb', line 43

def restart_session
  Tools::Log.instance.warn 'Restart session ...'
  janus = find_model
  if janus.present?
    send_messages_restart
    janus.set(session: @session)
  else
    Tools::Log.instance.error 'Janus Instance Model is gone, giving up'
  end
rescue
  raise Errors::Janus::KeepaliveThread::RestartSession
end

#startObject

Start a timer for TTL rubocop:disable Metrics/AbcSize :reek:TooManyStatements



59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
# File 'lib/rrj/janus/processus/keepalive/keepalive_thread.rb', line 59

def start
  @timer.loop_keepalive do
    if detached?(find_model)
      Tools::Log.instance.info \
        "Thread #{__id__} no longer attached to Janus Instance, " \
        'exiting...'
      @timer.stop_timer
      cleanup
      exit
    else
      Tools::Log.instance.info "Thread #{__id__} " \
        'sending keepalive to instance ' \
        "#{@message.instance} with TTL #{@timer.time_to_live}"
      response_keepalive
    end
  end
rescue
  raise Errors::Janus::KeepaliveThread::Start
end