Class: Plushie::Test::SessionPool

Inherits:
Object
  • Object
show all
Defined in:
lib/plushie/test/session_pool.rb

Overview

Shared renderer process for concurrent test sessions.

Owns a single +plushie --mock --max-sessions N+ process and multiplexes messages from multiple test sessions over it. Each session gets a unique session ID (e.g. "test_1", "test_2"); responses are demuxed by the +session+ field and forwarded to the owning queue.

See the Elixir SDK's SessionPool for reference.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(mode: :mock, format: :msgpack, max_sessions: 8, binary: nil) ⇒ SessionPool

Returns a new instance of SessionPool.

Parameters:

  • mode (:mock, :headless, :windowed) (defaults to: :mock)

    renderer mode

  • format (:msgpack, :json) (defaults to: :msgpack)

    wire format

  • max_sessions (Integer) (defaults to: 8)

    max concurrent sessions

  • binary (String, nil) (defaults to: nil)

    path to renderer binary



24
25
26
27
28
29
30
31
32
33
34
# File 'lib/plushie/test/session_pool.rb', line 24

def initialize(mode: :mock, format: :msgpack, max_sessions: 8, binary: nil)
  @mode = mode
  @format = format
  @max_sessions = max_sessions
  @binary = binary
  @connection = nil
  @sessions = {}  # session_id -> Thread::Queue
  @counter = 0
  @mutex = Mutex.new
  @started = false
end

Instance Attribute Details

#format:msgpack, :json (readonly)

Returns:

  • (:msgpack, :json)


18
19
20
# File 'lib/plushie/test/session_pool.rb', line 18

def format
  @format
end

Instance Method Details

#read_message(session_id, timeout: 10) ⇒ Object

Read the next message for a session (blocking).

Parameters:

  • session_id (String)
  • timeout (Numeric) (defaults to: 10)

    max wait time in seconds

Returns:

  • (Object)

    the decoded message



107
108
109
110
111
112
113
114
# File 'lib/plushie/test/session_pool.rb', line 107

def read_message(session_id, timeout: 10)
  queue = @mutex.synchronize { @sessions[session_id] }
  raise "Unknown session: #{session_id}" unless queue

  msg = nil
  Timeout.timeout(timeout) { msg = queue.pop }
  msg
end

#registerString

Register a new session. Returns a unique session ID.

Returns:

  • (String)

    session ID

Raises:

  • (RuntimeError)

    if pool is full



53
54
55
56
57
58
59
60
61
62
63
64
# File 'lib/plushie/test/session_pool.rb', line 53

def register
  @mutex.synchronize do
    if @sessions.size >= @max_sessions
      raise "Session pool full (#{@max_sessions} sessions). " \
        "Increase max_sessions or check for leaked sessions."
    end
    @counter += 1
    session_id = "test_#{@counter}"
    @sessions[session_id] = Thread::Queue.new
    session_id
  end
end

#send_and_wait(msg, session_id, response_type, timeout: 10) ⇒ Hash

Send a message and wait for a specific response type.

Parameters:

  • msg (Hash)
  • session_id (String)
  • response_type (Symbol)

    expected response type

  • timeout (Numeric) (defaults to: 10)

    max wait time in seconds

Returns:

  • (Hash)

    the response



97
98
99
100
# File 'lib/plushie/test/session_pool.rb', line 97

def send_and_wait(msg, session_id, response_type, timeout: 10)
  send_message(msg, session_id)
  wait_for_response(session_id, response_type, timeout: timeout)
end

#send_message(msg, session_id) ⇒ Object

Send a message for a session (fire-and-forget). Injects the session field automatically.

Parameters:

  • msg (Hash)

    message to send

  • session_id (String)


85
86
87
88
# File 'lib/plushie/test/session_pool.rb', line 85

def send_message(msg, session_id)
  encoded = Protocol::Encode.encode(msg.merge(session: session_id), @format)
  @connection.send_encoded(encoded)
end

#startObject

Start the renderer process.



37
38
39
40
41
42
43
44
45
46
47
# File 'lib/plushie/test/session_pool.rb', line 37

def start
  @connection = Connection.spawn(
    format: @format,
    binary: @binary,
    mode: @mode,
    max_sessions: @max_sessions,
    settings: {},
    on_message: method(:dispatch_message)
  )
  @started = true
end

#started?Boolean

Returns true if the pool is started.

Returns:

  • (Boolean)

    true if the pool is started



124
125
126
# File 'lib/plushie/test/session_pool.rb', line 124

def started?
  @started
end

#stopObject

Stop the pool and close the renderer.



117
118
119
120
121
# File 'lib/plushie/test/session_pool.rb', line 117

def stop
  @connection&.close
  @mutex.synchronize { @sessions.clear }
  @started = false
end

#unregister(session_id) ⇒ Object

Unregister a session. Sends Reset to the renderer.

Parameters:

  • session_id (String)


69
70
71
72
73
74
75
76
77
78
# File 'lib/plushie/test/session_pool.rb', line 69

def unregister(session_id)
  send_message({type: "reset", id: "reset_#{session_id}"}, session_id)
  # Wait for reset_response (with timeout)
  begin
    wait_for_response(session_id, :reset_response, timeout: 5)
  rescue Timeout::Error
    # Timeout on reset is not fatal -- the session is still removed
  end
  @mutex.synchronize { @sessions.delete(session_id) }
end