Class: Vines::Cluster

Inherits:
Object
  • Object
show all
Includes:
Log
Defined in:
lib/vines/cluster.rb,
lib/vines/cluster/pubsub.rb,
lib/vines/cluster/sessions.rb,
lib/vines/cluster/publisher.rb,
lib/vines/cluster/connection.rb,
lib/vines/cluster/subscriber.rb

Overview

Server instances may be connected to one another in a cluster so they can host a single chat domain, or set of domains, across many servers, transparently to users. A redis database is used for the session routing table, mapping JIDs to their node’s location. Redis pubsub channels are used to communicate amongst nodes.

Using a shared in-memory cache, like redis, rather than synchronizing the cache to each node, allows us to add cluster nodes dynamically, without updating all other nodes’ config files. It also greatly reduces the amount of memory required by the chat server processes.

Defined Under Namespace

Classes: Connection, PubSub, Publisher, Sessions, StreamProxy, Subscriber, UserProxy

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Log

#log

Constructor Details

#initialize(config, &block) ⇒ Cluster

Returns a new instance of Cluster.



29
30
31
32
33
34
35
36
37
# File 'lib/vines/cluster.rb', line 29

def initialize(config, &block)
  @config, @id = config, Kit.uuid
  @connection = Connection.new
  @sessions = Sessions.new(self)
  @publisher = Publisher.new(self)
  @subscriber = Subscriber.new(self)
  @pubsub = PubSub.new(self)
  instance_eval(&block)
end

Instance Attribute Details

#idObject (readonly)

Returns the value of attribute id.



17
18
19
# File 'lib/vines/cluster.rb', line 17

def id
  @id
end

Instance Method Details

#add_pubsub_node(domain, node) ⇒ Object

Create a pubsub topic (a.k.a. node), in the given domain, to which messages may be published. The domain argument will be one of the configured pubsub subdomains in conf/config.rb (e.g. games.wonderland.lit, topics.wonderland.lit, etc).



144
145
146
# File 'lib/vines/cluster.rb', line 144

def add_pubsub_node(domain, node)
  @pubsub.add_node(domain, node)
end

#connectObject

Create a new redis connection.



113
114
115
# File 'lib/vines/cluster.rb', line 113

def connect
  @connection.create
end

#connected_resources(jid) ⇒ Object

Return the connected streams for this user, without any proxy streams to remote cluster nodes (locally connected streams only).



130
131
132
# File 'lib/vines/cluster.rb', line 130

def connected_resources(jid)
  @config.router.connected_resources(jid, jid, false)
end

#connectionObject

Return the shared redis connection for most queries to use.



108
109
110
# File 'lib/vines/cluster.rb', line 108

def connection
  @connection.connect
end

#delete_pubsub_node(domain, node) ⇒ Object

Remove a pubsub topic so messages may no longer be broadcast to it.



149
150
151
# File 'lib/vines/cluster.rb', line 149

def delete_pubsub_node(domain, node)
  @pubsub.delete_node(domain, node)
end

#delete_session(jid) ⇒ Object

Remove this user from the cluster routing table so that no further stanzas may be routed to them. This must be called when the user’s session is terminated, either by logout or stream disconnect.



75
76
77
# File 'lib/vines/cluster.rb', line 75

def delete_session(jid)
  @sessions.delete(jid)
end

#delete_sessions(node) ⇒ Object

Remove all user sessions from the routing table associated with the given node ID. Cluster nodes call this themselves during normal shutdown. However, if a node dies without being properly shutdown, the other nodes will cleanup its sessions when they detect the node is offline.



83
84
85
# File 'lib/vines/cluster.rb', line 83

def delete_sessions(node)
  @sessions.delete_all(node)
end

#poke(node, time) ⇒ Object

Notify the session store that this node is still alive. The node broadcasts its current time, so all cluster members’ clocks don’t necessarily need to be in sync.



90
91
92
# File 'lib/vines/cluster.rb', line 90

def poke(node, time)
  @sessions.poke(node, time)
end

#pubsub_node?(domain, node) ⇒ Boolean

Return true if the pubsub topic exists and messages may be published to it.

Returns:

  • (Boolean)


172
173
174
# File 'lib/vines/cluster.rb', line 172

def pubsub_node?(domain, node)
  @pubsub.node?(domain, node)
end

#pubsub_subscribed?(domain, node, jid) ⇒ Boolean

Return true if the JID is a registered subscriber to the pubsub topic and messages published to it should be routed to the JID.

Returns:

  • (Boolean)


178
179
180
# File 'lib/vines/cluster.rb', line 178

def pubsub_subscribed?(domain, node, jid)
  @pubsub.subscribed?(domain, node, jid)
end

#pubsub_subscribers(domain, node) ⇒ Object

Return a list of JIDs subscribed to the pubsub topic.



183
184
185
# File 'lib/vines/cluster.rb', line 183

def pubsub_subscribers(domain, node)
  @pubsub.subscribers(domain, node)
end

#query(name, *args) ⇒ Object

Turn an asynchronous redis query into a blocking call by pausing the fiber in which this code is running. Return the result of the query from this method, rather than passing it to a callback block.



120
121
122
123
124
125
126
# File 'lib/vines/cluster.rb', line 120

def query(name, *args)
  fiber, yielding = Fiber.current, true
  req = connection.send(name, *args)
  req.errback  { fiber.resume rescue yielding = false }
  req.callback {|response| fiber.resume(response) }
  Fiber.yield if yielding
end

#remote_sessions(*jids) ⇒ Object

Returns any streams hosted at remote nodes for these JIDs. The streams act like normal EM::Connections, but are actually proxies that route stanzas over redis pubsub channels to remote nodes.



59
60
61
62
63
# File 'lib/vines/cluster.rb', line 59

def remote_sessions(*jids)
  @sessions.find(*jids).map do |session|
    StreamProxy.new(self, session)
  end
end

#route(stanza, node) ⇒ Object

Send the stanza to the node hosting the user’s session. The stanza is published to the channel to which the remote node is listening for messages.



97
98
99
# File 'lib/vines/cluster.rb', line 97

def route(stanza, node)
  @publisher.route(stanza, node)
end

#save_session(jid, attrs) ⇒ Object

Persist the user’s session to the shared redis cache so that other cluster nodes can locate the node hosting this user’s connection and route messages to them.



68
69
70
# File 'lib/vines/cluster.rb', line 68

def save_session(jid, attrs)
  @sessions.save(jid, attrs)
end

#startObject

Join this node to the cluster by broadcasting its state to the other nodes, subscribing to redis channels, and scheduling periodic heartbeat broadcasts. This method must be called after initialize or this node will not be a cluster member.



43
44
45
46
47
48
49
50
51
52
53
54
# File 'lib/vines/cluster.rb', line 43

def start
  @connection.connect
  @publisher.broadcast(:online)
  @subscriber.subscribe

  EM.add_periodic_timer(1) { heartbeat }

  at_exit do
    @publisher.broadcast(:offline)
    @sessions.delete_all(@id)
  end
end

#storage(domain) ⇒ Object

Return the Storage implementation for this domain or nil if the domain is not hosted here.



136
137
138
# File 'lib/vines/cluster.rb', line 136

def storage(domain)
  @config.storage(domain)
end

#subscribe_pubsub(domain, node, jid) ⇒ Object

Subscribe the JID to the pubsub topic so it will receive any messages published to it.



155
156
157
# File 'lib/vines/cluster.rb', line 155

def subscribe_pubsub(domain, node, jid)
  @pubsub.subscribe(domain, node, jid)
end

#unsubscribe_all_pubsub(domain, jid) ⇒ Object

Unsubscribe the JID from all pubsub topics. This is useful when the JID’s session ends by logout or disconnect.



167
168
169
# File 'lib/vines/cluster.rb', line 167

def unsubscribe_all_pubsub(domain, jid)
  @pubsub.unsubscribe_all(domain, jid)
end

#unsubscribe_pubsub(domain, node, jid) ⇒ Object

Unsubscribe the JID from the pubsub topic, deregistering its interest in receiving any messages published to it.



161
162
163
# File 'lib/vines/cluster.rb', line 161

def unsubscribe_pubsub(domain, node, jid)
  @pubsub.unsubscribe(domain, node, jid)
end

#update_user(jid, node) ⇒ Object

Notify the remote node that the user’s roster has changed and it should reload the user from storage.



103
104
105
# File 'lib/vines/cluster.rb', line 103

def update_user(jid, node)
  @publisher.update_user(jid, node)
end