Class: Plushie::Test::SessionPool
- Inherits:
-
Object
- Object
- Plushie::Test::SessionPool
- Defined in:
- lib/plushie/test/session_pool.rb
Overview
Shared renderer process for concurrent test sessions.
Owns a single +plushie --mock --max-sessions N+ process and multiplexes messages from multiple test sessions over it. Each session gets a unique session ID (e.g. "test_1", "test_2"); responses are demuxed by the +session+ field and forwarded to the owning queue.
See the Elixir SDK's SessionPool for reference.
Instance Attribute Summary collapse
- #format ⇒ :msgpack, :json readonly
Instance Method Summary collapse
-
#initialize(mode: :mock, format: :msgpack, max_sessions: 8, binary: nil) ⇒ SessionPool
constructor
A new instance of SessionPool.
-
#read_message(session_id, timeout: 10) ⇒ Object
Read the next message for a session (blocking).
-
#register ⇒ String
Register a new session.
-
#send_and_wait(msg, session_id, response_type, timeout: 10) ⇒ Hash
Send a message and wait for a specific response type.
-
#send_message(msg, session_id) ⇒ Object
Send a message for a session (fire-and-forget).
-
#start ⇒ Object
Start the renderer process.
-
#started? ⇒ Boolean
True if the pool is started.
-
#stop ⇒ Object
Stop the pool and close the renderer.
-
#unregister(session_id) ⇒ Object
Unregister a session.
Constructor Details
#initialize(mode: :mock, format: :msgpack, max_sessions: 8, binary: nil) ⇒ SessionPool
Returns a new instance of SessionPool.
24 25 26 27 28 29 30 31 32 33 34 |
# File 'lib/plushie/test/session_pool.rb', line 24 def initialize(mode: :mock, format: :msgpack, max_sessions: 8, binary: nil) @mode = mode @format = format @max_sessions = max_sessions @binary = binary @connection = nil @sessions = {} # session_id -> Thread::Queue @counter = 0 @mutex = Mutex.new @started = false end |
Instance Attribute Details
#format ⇒ :msgpack, :json (readonly)
18 19 20 |
# File 'lib/plushie/test/session_pool.rb', line 18 def format @format end |
Instance Method Details
#read_message(session_id, timeout: 10) ⇒ Object
Read the next message for a session (blocking).
107 108 109 110 111 112 113 114 |
# File 'lib/plushie/test/session_pool.rb', line 107 def (session_id, timeout: 10) queue = @mutex.synchronize { @sessions[session_id] } raise "Unknown session: #{session_id}" unless queue msg = nil Timeout.timeout(timeout) { msg = queue.pop } msg end |
#register ⇒ String
Register a new session. Returns a unique session ID.
53 54 55 56 57 58 59 60 61 62 63 64 |
# File 'lib/plushie/test/session_pool.rb', line 53 def register @mutex.synchronize do if @sessions.size >= @max_sessions raise "Session pool full (#{@max_sessions} sessions). " \ "Increase max_sessions or check for leaked sessions." end @counter += 1 session_id = "test_#{@counter}" @sessions[session_id] = Thread::Queue.new session_id end end |
#send_and_wait(msg, session_id, response_type, timeout: 10) ⇒ Hash
Send a message and wait for a specific response type.
97 98 99 100 |
# File 'lib/plushie/test/session_pool.rb', line 97 def send_and_wait(msg, session_id, response_type, timeout: 10) (msg, session_id) wait_for_response(session_id, response_type, timeout: timeout) end |
#send_message(msg, session_id) ⇒ Object
Send a message for a session (fire-and-forget). Injects the session field automatically.
85 86 87 88 |
# File 'lib/plushie/test/session_pool.rb', line 85 def (msg, session_id) encoded = Protocol::Encode.encode(msg.merge(session: session_id), @format) @connection.send_encoded(encoded) end |
#start ⇒ Object
Start the renderer process.
37 38 39 40 41 42 43 44 45 46 47 |
# File 'lib/plushie/test/session_pool.rb', line 37 def start @connection = Connection.spawn( format: @format, binary: @binary, mode: @mode, max_sessions: @max_sessions, settings: {}, on_message: method(:dispatch_message) ) @started = true end |
#started? ⇒ Boolean
Returns true if the pool is started.
124 125 126 |
# File 'lib/plushie/test/session_pool.rb', line 124 def started? @started end |
#stop ⇒ Object
Stop the pool and close the renderer.
117 118 119 120 121 |
# File 'lib/plushie/test/session_pool.rb', line 117 def stop @connection&.close @mutex.synchronize { @sessions.clear } @started = false end |
#unregister(session_id) ⇒ Object
Unregister a session. Sends Reset to the renderer.
69 70 71 72 73 74 75 76 77 78 |
# File 'lib/plushie/test/session_pool.rb', line 69 def unregister(session_id) ({type: "reset", id: "reset_#{session_id}"}, session_id) # Wait for reset_response (with timeout) begin wait_for_response(session_id, :reset_response, timeout: 5) rescue Timeout::Error # Timeout on reset is not fatal -- the session is still removed end @mutex.synchronize { @sessions.delete(session_id) } end |