Class: Goat::StateSrvConnection

Inherits:
EM::Connection
  • Object
show all
Includes:
EM::P::LineText2
Defined in:
lib/goat/state-srv.rb

Overview

this is an ugly mix of sync and async stuff

Constant Summary collapse

@@connection =
nil

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#pusherObject

Returns the value of attribute pusher.



117
118
119
# File 'lib/goat/state-srv.rb', line 117

def pusher
  @pusher
end

Class Method Details

.connect(host, port, &dlg) ⇒ Object



73
74
75
76
77
# File 'lib/goat/state-srv.rb', line 73

def self.connect(host, port, &dlg)
  @host = host
  @port = port
  EM.connect(host, port, self)
end

.connected?Boolean

Returns:

  • (Boolean)


71
# File 'lib/goat/state-srv.rb', line 71

def self.connected?; @@connection != nil; end

.connectionObject



69
# File 'lib/goat/state-srv.rb', line 69

def self.connection; @@connection; end

.connection=(c) ⇒ Object



70
# File 'lib/goat/state-srv.rb', line 70

def self.connection=(c); @@connection = c; end

.reconnectObject



79
80
81
82
# File 'lib/goat/state-srv.rb', line 79

def self.reconnect
  raise 'Reconnect called before connection made' if @host.nil? || @port.nil?
  self.connect(@host, @port)
end

.reconnect_syncObject



84
85
86
# File 'lib/goat/state-srv.rb', line 84

def self.reconnect_sync
  @sock = TCPSocket.open(@host, @port)
end

.send_message(*args) ⇒ Object



109
110
111
112
113
114
115
# File 'lib/goat/state-srv.rb', line 109

def self.send_message(*args)
  if self.connected?
    self.connection.send_message(*args)
  else
    raise NoStateSrvConnectionError
  end
end

.send_message_sync(msg, failed_last_time = false) ⇒ Object



92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
# File 'lib/goat/state-srv.rb', line 92

def self.send_message_sync(msg, failed_last_time=false)
  reconnect_sync unless sync_connection_active?
  @sock.write(msg.to_json + "\n")
  resp = @sock.readline
  Goat.logd("=> #{resp.inspect}") if $verbose
  resp
rescue Errno::ECONNRESET, EOFError => e
  # almost certainly connection was closed and we didn't notice
  if failed_last_time
    raise e
  else
    Goat.logw "Reinitializing sync connection to state-srv (#{e.inspect})"
    reconnect_sync
    send_message_sync(msg, true)
  end
end

.sync_connection_active?Boolean

Returns:

  • (Boolean)


88
89
90
# File 'lib/goat/state-srv.rb', line 88

def self.sync_connection_active?
  @sock && !@sock.closed?
end

Instance Method Details

#closeObject



159
160
161
# File 'lib/goat/state-srv.rb', line 159

def close
  close_connection
end

#connection_completedObject



119
120
121
122
123
# File 'lib/goat/state-srv.rb', line 119

def connection_completed
  @was_connected = true
  Goat.logw "Connected to StateSrv"
  StateSrvConnection.connection = self
end

#message_received(msg) ⇒ Object



137
138
139
140
141
142
143
144
145
# File 'lib/goat/state-srv.rb', line 137

def message_received(msg)
  msg = msg['response']

  if msg['type'] = 'update_ack'
    StateSrvClient.components_update_completed(msg['components'])
  else
    raise "Unknown message type: #{msg['type']}"
  end
end

#receive_line(line) ⇒ Object



125
126
127
128
129
130
131
132
133
134
135
# File 'lib/goat/state-srv.rb', line 125

def receive_line(line)
  msg = JSON.load(line)

  Goat.logd("=> #{msg.inspect}") if $verbose

  if msg.is_a?(Array) 
    msg.each{|m| message_received(m)}
  else
    message_received(msg)
  end
end

#send_message(t, msg, sync = false) ⇒ Object



147
148
149
150
151
152
153
154
155
156
157
# File 'lib/goat/state-srv.rb', line 147

def send_message(t, msg, sync=false)
  msg = msg.merge('type' => t)

  Goat.logd(">> #{msg.inspect}") if $verbose

  if sync
    self.class.send_message_sync(msg) # TODO better way to do this?
  else
    send_data(msg.to_json + "\n")
  end
end

#unbindObject



163
164
165
166
167
168
169
170
171
172
# File 'lib/goat/state-srv.rb', line 163

def unbind
  if @was_connected
    Goat.logw "Lost StateSrv connection"
  else
    Goat.logw "Couldn't open StateSrv connection"
  end

  StateSrvConnection.connection = nil
  EM.add_timer(5) { self.class.reconnect }
end