Class: Rubcask::Server::Threaded

Inherits:
AbstractServer show all
Includes:
Protocol
Defined in:
lib/rubcask/server/threaded.rb

Overview

Thread-based server supporting Rubcask protocol If you are running on CRuby you should consider using Server::Async as it is generally more performant

Constant Summary

Constants included from Protocol

Protocol::ERROR, Protocol::NIL, Protocol::OK, Protocol::PING, Protocol::PONG, Protocol::SEPARATOR

Constants inherited from AbstractServer

AbstractServer::BLOCK_SIZE, AbstractServer::MAX_READ_SIZE

Instance Attribute Summary

Attributes inherited from AbstractServer

#dir

Instance Method Summary collapse

Methods included from Protocol

create_call_message, encode_message, error_message, nil_message, ok_message, ping_message, pong_message

Constructor Details

#initialize(dir, config: Server::Config.new) ⇒ Threaded

Returns a new instance of Threaded.



19
20
21
22
23
24
25
26
27
28
29
30
# File 'lib/rubcask/server/threaded.rb', line 19

def initialize(dir, config: Server::Config.new)
  @dir = dir
  @config = config
  @hostname = config.hostname
  @port = config.port
  @logger = Logger.new($stdout)
  @logger.level = Logger::INFO
  @threads = ThreadGroup.new
  @connected = false
  @status = :stopped
  @listeners = []
end

Instance Method Details

#connectself

Creates sockets

Returns:

  • (self)


34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# File 'lib/rubcask/server/threaded.rb', line 34

def connect
  return if @connected
  @connected = true
  @listeners = Socket.tcp_server_sockets(@hostname, @port)
  if @config.keepalive
    @listeners.each do |s|
      s.setsockopt(Socket::SOL_SOCKET, Socket::SO_KEEPALIVE, true)
    end
  end
  @listeners.each do |s|
    address = s.connect_address
    logger.info "Listening on #{address.ip_address}:#{address.ip_port}"
  end
  self
end

#setup_shutdown_pipeself

Prepares an IO pipe that is used in shutdown process Call if you need to shutdown the server from a different thread

Returns:

  • (self)


95
96
97
98
# File 'lib/rubcask/server/threaded.rb', line 95

def setup_shutdown_pipe
  @shutdown_pipe ||= IO.pipe
  self
end

#shutdownObject

Note:

You probably want to use it in a signal trap

Shuts down the server



80
81
82
83
84
85
86
87
88
89
90
# File 'lib/rubcask/server/threaded.rb', line 80

def shutdown
  if @status == :running
    @status = :shutdown
  end
  begin
    @shutdown_pipe[1].write_nonblock("\0")
    @shutdown_pipe[1].close
  rescue
    # We might have race with cleanup shutdown pipe
  end
end

#startObject

Note:

It blocks the current thread

Starts the server



52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
# File 'lib/rubcask/server/threaded.rb', line 52

def start
  connect

  setup_shutdown_pipe

  @status = :running

  Thread.handle_interrupt(Exception => :never) do
    Thread.handle_interrupt(Exception => :immediate) do
      accept_loop
    end
  ensure
    cleanup_shutdown_pipe
    @status = :shutdown
    cleanup_listeners
    @threads.list.each(&:kill)
    @threads.list.each do |t|
      t.join
    rescue
    end
    @status = :stopped
    @connected = false
    logger.info "Closed server"
  end
end