Class: Anyt::Client
- Inherits:
-
Object
- Object
- Anyt::Client
- Defined in:
- lib/anyt/client.rb
Overview
Synchronous websocket client Based on github.com/rails/rails/blob/v5.0.1/actioncable/test/client_test.rb
Defined Under Namespace
Classes: TimeoutError
Constant Summary collapse
- WAIT_WHEN_EXPECTING_EVENT =
5
- WAIT_WHEN_NOT_EXPECTING_EVENT =
0.5
Instance Method Summary collapse
- #close(allow_messages: false) ⇒ Object
- #closed? ⇒ Boolean
-
#initialize(ignore: [], url: Anyt.config.target_url, qs: "", cookies: "", headers: {}, protocol: "actioncable-v1-json", timeout_multiplier: Anyt.config.timeout_multiplier) ⇒ Client
constructor
A new instance of Client.
-
#receive(timeout: WAIT_WHEN_EXPECTING_EVENT) ⇒ Object
rubocop: enable Metrics/BlockLength rubocop: enable Metrics/AbcSize rubocop: enable Metrics/MethodLength.
- #send(message) ⇒ Object
- #wait_for_close ⇒ Object
Constructor Details
#initialize(ignore: [], url: Anyt.config.target_url, qs: "", cookies: "", headers: {}, protocol: "actioncable-v1-json", timeout_multiplier: Anyt.config.timeout_multiplier) ⇒ Client
Returns a new instance of Client.
15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 |
# File 'lib/anyt/client.rb', line 15 def initialize( ignore: [], url: Anyt.config.target_url, qs: "", cookies: "", headers: {}, protocol: "actioncable-v1-json", timeout_multiplier: Anyt.config.timeout_multiplier ) = @ignore_message_types = ignore = @messages = Queue.new closed = @closed = Concurrent::Event.new = @has_messages = Concurrent::Semaphore.new(0) @timeout_multiplier = timeout_multiplier headers = headers.merge("cookie" => ) headers["Sec-WebSocket-Protocol"] = protocol open = Concurrent::Promise.new @ws = WebSocket::Client::Simple.connect( url + "?#{qs}", headers: headers ) do |ws| ws.on(:error) do |event| event = RuntimeError.new(event.) unless event.is_a?(Exception) if open.pending? open.fail(event) else << event .release end end ws.on(:open) do |_event| open.set(true) end ws.on(:message) do |event| next if event.type == :ping if event.type == :close closed.set else = JSON.parse(event.data) next if .include?(["type"]) AnyCable.logger.debug "Message received: #{}" << .release end end ws.on(:close) do |_event| closed.set end end open.wait!(WAIT_WHEN_EXPECTING_EVENT * @timeout_multiplier) end |
Instance Method Details
#close(allow_messages: false) ⇒ Object
95 96 97 98 99 100 101 102 |
# File 'lib/anyt/client.rb', line 95 def close(allow_messages: false) sleep WAIT_WHEN_NOT_EXPECTING_EVENT * @timeout_multiplier raise "#{@messages.size} messages unprocessed" unless || @messages.empty? @ws.close wait_for_close end |
#closed? ⇒ Boolean
108 109 110 |
# File 'lib/anyt/client.rb', line 108 def closed? @closed.set? end |
#receive(timeout: WAIT_WHEN_EXPECTING_EVENT) ⇒ Object
rubocop: enable Metrics/BlockLength rubocop: enable Metrics/AbcSize rubocop: enable Metrics/MethodLength
79 80 81 82 83 84 85 86 87 88 89 |
# File 'lib/anyt/client.rb', line 79 def receive(timeout: WAIT_WHEN_EXPECTING_EVENT) timeout *= @timeout_multiplier raise TimeoutError, "Timed out to receive message" unless @has_messages.try_acquire(1, timeout) msg = @messages.pop(true) raise msg if msg.is_a?(Exception) msg end |
#send(message) ⇒ Object
91 92 93 |
# File 'lib/anyt/client.rb', line 91 def send() @ws.send(JSON.generate()) end |
#wait_for_close ⇒ Object
104 105 106 |
# File 'lib/anyt/client.rb', line 104 def wait_for_close @closed.wait(WAIT_WHEN_EXPECTING_EVENT * @timeout_multiplier) end |