Class: FastCI::WebSocket

Inherits:
Object
  • Object
show all
Defined in:
lib/fast_ci.rb

Constant Summary collapse

SUPPORTED_EVENTS =
%i[enq_request deq].freeze

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(run_key) ⇒ WebSocket

Returns a new instance of WebSocket.



114
115
116
117
118
# File 'lib/fast_ci.rb', line 114

def initialize(run_key)
  @on = {}
  @ref = 1
  @run_key = run_key
end

Instance Attribute Details

#connectionObject

Returns the value of attribute connection.



110
111
112
# File 'lib/fast_ci.rb', line 110

def connection
  @connection
end

#node_indexObject (readonly)

Returns the value of attribute node_index.



109
110
111
# File 'lib/fast_ci.rb', line 109

def node_index
  @node_index
end

#run_keyObject

Returns the value of attribute run_key.



110
111
112
# File 'lib/fast_ci.rb', line 110

def run_key
  @run_key
end

#taskObject

Returns the value of attribute task.



110
111
112
# File 'lib/fast_ci.rb', line 110

def task
  @task
end

Instance Method Details

#await(retry_count = 0) ⇒ Object



181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
# File 'lib/fast_ci.rb', line 181

def await(retry_count = 0)
  connect_to_ws do
    send_msg("phx_join")

    begin
      while message = connection.read
        FastCI.debug("ws#msg_received: #{message.inspect}")

        response = message.dig(:payload, :response)
        
        # Heartbeat
        next if message.dig(:ref) == 0

        case response&.dig(:event) || message[:event]
        when "phx_error"
          raise("Unexpected server error")
        when "join"
          handle_join(response)
        when "failed_join"
          handle_join_fail(response)
        when "deq_request"
          handle_deq_request(response)
        when "deq"
          if (tests = response[:tests]).any?
            result = @on[:deq].call(tests)
            task.async do
              send_msg("deq", result)
            end
          else
            break
          end
        else
          raise(response.inspect)
        end
      end
    rescue => e
      FastCI.error("Unexpected error: #{e.message}\n\t#{e.backtrace.join("\n\t")}")
      task&.stop
    end
  end
end

#connect_to_wsObject



137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
# File 'lib/fast_ci.rb', line 137

def connect_to_ws
  Async do |task|
    before_start_connection

    begin
      Async::WebSocket::Client.connect(endpoint) do |connection|
        self.connection = connection
        after_start_connection
        self.task = task
        yield

        leave
      end
    rescue Async::WebSocket::ProtocolError => e
      case e.message
      when "Failed to negotiate connection: 401"
        FastCI.error("Failed to connect: Invalid secret key!")
        exit 2
      when "Failed to negotiate connection: 403"
        FastCI.error("Failed to connect: Invalid or disabled run key!")
        exit 2
      else
        FastCI.error("Failed to connect: General connection error!")
        exit 2
      end
    end
  end
end

#get_connectionObject



133
134
135
# File 'lib/fast_ci.rb', line 133

def get_connection
  connection
end

#on(event, &block) ⇒ Object



120
121
122
123
124
125
# File 'lib/fast_ci.rb', line 120

def on(event, &block)
  raise EventNotSupportedError, event unless SUPPORTED_EVENTS.include?(event)
  raise EventAlreadyDefinedError, event if @on[event]

  @on[event] = block
end

#send_heartbeatObject



176
177
178
179
# File 'lib/fast_ci.rb', line 176

def send_heartbeat
  FastCI.debug("Sending heartbeat")
  send_msg("heartbeat", {}, "phoenix", 0)
end

#send_msg(event, payload = {}, custom_topic = nil, custom_ref = nil) ⇒ Object



127
128
129
130
131
# File 'lib/fast_ci.rb', line 127

def send_msg(event, payload = {}, custom_topic = nil, custom_ref = nil)
  FastCI.debug("ws#send_msg: #{event} -> #{payload.inspect}")
  connection.write({ "topic": custom_topic || topic, "event": event, "payload": payload, "ref": custom_ref || ref })
  connection.flush
end

#start_heartbeatObject



166
167
168
169
170
171
172
173
174
# File 'lib/fast_ci.rb', line 166

def start_heartbeat
  Async do
    loop do
      sleep 30
      break if connection.closed?
      send_heartbeat
    end
  end
end