Class: LS4::HeartbeatClientService

Inherits:
Service show all
Defined in:
lib/ls4/service/heartbeat.rb

Direct Known Subclasses

HeartbeatMemberService

Instance Method Summary collapse

Methods inherited from Service

init

Methods included from EventBus::SingletonMixin

#ebus_bind!, #ebus_connect, extended

Methods included from EventBus::BusMixin

#ebus_all_slots, #ebus_disconnect!

Methods included from EventBus::DeclarerBase::Methods

#connect, #ebus_all_slots, #ebus_call_log, #ebus_call_slots, #ebus_signal_error, #ebus_signal_log, #ebus_signal_slots

Methods included from EventBus::DeclarerBase

#call_slot, #signal_slot

Constructor Details

#initializeHeartbeatClientService

Returns a new instance of HeartbeatClientService.



72
73
74
# File 'lib/ls4/service/heartbeat.rb', line 72

def initialize
  @heartbeat_nid = nil
end

Instance Method Details

#do_heartbeat_blockingObject



127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
# File 'lib/ls4/service/heartbeat.rb', line 127

def do_heartbeat_blocking
  sync_hash = SyncBus.get_hash
  begin
    res = @cs.call(:heartbeat, @heartbeat_nid, sync_hash)
    hbres = HeartbeatResponse.new.from_msgpack(res)
    if hbres.sync_needed
      ProcessBus.submit {
        SyncBus.try_sync
      }
    end
  rescue
    $log.error "heartbeat error: #{$!}"
    $log.error_backtrace $!.backtrace
  end
  nil
end

#heartbeat_blocking!Object



144
145
146
# File 'lib/ls4/service/heartbeat.rb', line 144

def heartbeat_blocking!
  do_heartbeat_blocking
end

#runObject

def get_cs_session ProcessBus.get_session(ConfigBus.get_cs_address) end

def on_timer do_heartbeat end

def do_heartbeat sync_hash = SyncBus.get_hash get_cs_session.callback(:heartbeat, @heartbeat_nid, sync_hash) do |future| begin hbres = HeartbeatResponse.new.from_msgpack(future.get) ack_heartbeat(hbres) rescue $log.error “heartbeat error: #$!” $log.error_backtrace $!.backtrace end end end

def ack_heartbeat(hbres) if hbres.sync_needed SyncBus.try_sync end end

ebus_connect :ProcessBus, :on_timer

def heartbeat_blocking! do_heartbeat.join end



110
111
112
113
114
115
116
117
118
119
# File 'lib/ls4/service/heartbeat.rb', line 110

def run
  @cs = MessagePack::RPC::Client.new(*ConfigBus.get_cs_address)
  @end = false
  @thread = Thread.new do
    while !@end
      sleep 1
      do_heartbeat_blocking
    end
  end
end

#shutdownObject



121
122
123
124
125
# File 'lib/ls4/service/heartbeat.rb', line 121

def shutdown
  @end = true
  #@thread.join
  @cs.close
end