Class: Faye::Redis
- Inherits:
-
Object
- Object
- Faye::Redis
- Defined in:
- lib/faye/redis.rb,
lib/faye/redis/logger.rb,
lib/faye/redis/version.rb,
lib/faye/redis/connection.rb,
lib/faye/redis/message_queue.rb,
lib/faye/redis/client_registry.rb,
lib/faye/redis/pubsub_coordinator.rb,
lib/faye/redis/subscription_manager.rb
Defined Under Namespace
Classes: ClientRegistry, Connection, Logger, MessageQueue, PubSubCoordinator, SubscriptionManager
Constant Summary collapse
- DEFAULT_OPTIONS =
Default configuration options
{ host: 'localhost', port: 6379, database: 0, password: nil, pool_size: 5, pool_timeout: 5, connect_timeout: 1, read_timeout: 1, write_timeout: 1, max_retries: 3, retry_delay: 1, client_timeout: 60, message_ttl: 3600, subscription_ttl: 3600, # Subscription keys TTL (1 hour, matches message_ttl), provides safety net if GC fails namespace: 'faye', gc_interval: 60, # Automatic garbage collection interval (seconds), set to 0 or false to disable cleanup_batch_size: 50 # Number of items per batch during cleanup (min: 1, max: 1000, prevents blocking) }.freeze
- VERSION =
'1.0.14'
Instance Attribute Summary collapse
-
#client_registry ⇒ Object
readonly
Returns the value of attribute client_registry.
-
#connection ⇒ Object
readonly
Returns the value of attribute connection.
-
#message_queue ⇒ Object
readonly
Returns the value of attribute message_queue.
-
#options ⇒ Object
readonly
Returns the value of attribute options.
-
#pubsub_coordinator ⇒ Object
readonly
Returns the value of attribute pubsub_coordinator.
-
#server ⇒ Object
readonly
Returns the value of attribute server.
-
#subscription_manager ⇒ Object
readonly
Returns the value of attribute subscription_manager.
Class Method Summary collapse
-
.create(server, options) ⇒ Object
Factory method to create a new Redis engine instance.
Instance Method Summary collapse
-
#cleanup_expired(&callback) ⇒ Object
Clean up expired clients and their associated data.
-
#client_exists(client_id, &callback) ⇒ Object
Check if a client exists.
-
#create_client(&callback) ⇒ Object
Create a new client.
-
#destroy_client(client_id, &callback) ⇒ Object
Destroy a client.
-
#disconnect ⇒ Object
Disconnect the engine.
-
#empty_queue(client_id) ⇒ Object
Empty a client’s message queue.
-
#initialize(server, options = {}) ⇒ Redis
constructor
A new instance of Redis.
-
#ping(client_id) ⇒ Object
Ping a client to keep it alive Also refreshes subscription TTLs to keep them alive while client is connected.
-
#publish(message, channels, &callback) ⇒ Object
Publish a message to channels.
-
#subscribe(client_id, channel, &callback) ⇒ Object
Subscribe a client to a channel.
-
#unsubscribe(client_id, channel, &callback) ⇒ Object
Unsubscribe a client from a channel.
Constructor Details
#initialize(server, options = {}) ⇒ Redis
Returns a new instance of Redis.
42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 |
# File 'lib/faye/redis.rb', line 42 def initialize(server, = {}) @server = server @options = DEFAULT_OPTIONS.merge() @logger = Logger.new('Faye::Redis', @options) # Initialize components @connection = Connection.new(@options) @client_registry = ClientRegistry.new(@connection, @options) @subscription_manager = SubscriptionManager.new(@connection, @options) @message_queue = MessageQueue.new(@connection, @options) @pubsub_coordinator = PubSubCoordinator.new(@connection, @options) # Set up message routing # Start automatic garbage collection timer start_gc_timer end |
Instance Attribute Details
#client_registry ⇒ Object (readonly)
Returns the value of attribute client_registry.
34 35 36 |
# File 'lib/faye/redis.rb', line 34 def client_registry @client_registry end |
#connection ⇒ Object (readonly)
Returns the value of attribute connection.
34 35 36 |
# File 'lib/faye/redis.rb', line 34 def connection @connection end |
#message_queue ⇒ Object (readonly)
Returns the value of attribute message_queue.
34 35 36 |
# File 'lib/faye/redis.rb', line 34 def @message_queue end |
#options ⇒ Object (readonly)
Returns the value of attribute options.
34 35 36 |
# File 'lib/faye/redis.rb', line 34 def @options end |
#pubsub_coordinator ⇒ Object (readonly)
Returns the value of attribute pubsub_coordinator.
34 35 36 |
# File 'lib/faye/redis.rb', line 34 def pubsub_coordinator @pubsub_coordinator end |
#server ⇒ Object (readonly)
Returns the value of attribute server.
34 35 36 |
# File 'lib/faye/redis.rb', line 34 def server @server end |
#subscription_manager ⇒ Object (readonly)
Returns the value of attribute subscription_manager.
34 35 36 |
# File 'lib/faye/redis.rb', line 34 def subscription_manager @subscription_manager end |
Class Method Details
.create(server, options) ⇒ Object
Factory method to create a new Redis engine instance
38 39 40 |
# File 'lib/faye/redis.rb', line 38 def self.create(server, ) new(server, ) end |
Instance Method Details
#cleanup_expired(&callback) ⇒ Object
Clean up expired clients and their associated data
193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 |
# File 'lib/faye/redis.rb', line 193 def cleanup_expired(&callback) # Clean up stale local message IDs first @client_registry.cleanup_expired do |expired_count| @logger.info("Cleaned up #{expired_count} expired clients") if expired_count > 0 # Always clean up orphaned subscription data (even if no expired clients) # This handles cases where subscriptions were orphaned due to crashes # and removes empty channel Sets and unused patterns # Uses batched processing to avoid blocking the connection pool @client_registry.all do |active_clients| @subscription_manager.cleanup_orphaned_data(active_clients) do callback.call(expired_count) if callback end end end end |
#client_exists(client_id, &callback) ⇒ Object
Check if a client exists
86 87 88 |
# File 'lib/faye/redis.rb', line 86 def client_exists(client_id, &callback) @client_registry.exists?(client_id, &callback) end |
#create_client(&callback) ⇒ Object
Create a new client
62 63 64 65 66 67 68 69 70 71 72 73 74 |
# File 'lib/faye/redis.rb', line 62 def create_client(&callback) # Ensure GC timer is started (lazy initialization) ensure_gc_timer_started client_id = generate_client_id @client_registry.create(client_id) do |success| if success callback.call(client_id) else callback.call(nil) end end end |
#destroy_client(client_id, &callback) ⇒ Object
Destroy a client
77 78 79 80 81 82 83 |
# File 'lib/faye/redis.rb', line 77 def destroy_client(client_id, &callback) @subscription_manager.unsubscribe_all(client_id) do @message_queue.clear(client_id) do @client_registry.destroy(client_id, &callback) end end end |
#disconnect ⇒ Object
Disconnect the engine
184 185 186 187 188 189 190 |
# File 'lib/faye/redis.rb', line 184 def disconnect # Stop GC timer if running stop_gc_timer @pubsub_coordinator.disconnect @connection.disconnect end |
#empty_queue(client_id) ⇒ Object
Empty a client’s message queue
179 180 181 |
# File 'lib/faye/redis.rb', line 179 def empty_queue(client_id) @message_queue.dequeue_all(client_id) end |
#ping(client_id) ⇒ Object
Ping a client to keep it alive Also refreshes subscription TTLs to keep them alive while client is connected
92 93 94 95 |
# File 'lib/faye/redis.rb', line 92 def ping(client_id) @client_registry.ping(client_id) @subscription_manager.refresh_client_subscriptions_ttl(client_id) end |
#publish(message, channels, &callback) ⇒ Object
Publish a message to channels
108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 |
# File 'lib/faye/redis.rb', line 108 def publish(, channels, &callback) channels = [channels] unless channels.is_a?(Array) begin # Ensure message has an ID for deduplication = .dup unless .frozen? ['id'] ||= # Track this message as locally published with timestamp if @local_message_ids = Time.now.to_i if @local_message_ids_mutex @local_message_ids_mutex.synchronize { @local_message_ids[['id']] = } else @local_message_ids[['id']] = end end total_channels = channels.size completed_channels = 0 callback_called = false all_success = true channels.each do |channel| # Get subscribers and process in parallel @subscription_manager.get_subscribers(channel) do |client_ids| # Track operations for this channel pending_ops = 2 # pubsub + enqueue channel_success = true ops_completed = 0 complete_channel = lambda do ops_completed += 1 if ops_completed == pending_ops # This channel is complete all_success &&= channel_success completed_channels += 1 # Call final callback when all channels are done if completed_channels == total_channels && !callback_called && callback callback_called = true EventMachine.next_tick { callback.call(all_success) } end end end # Publish to pub/sub @pubsub_coordinator.publish(channel, ) do |published| channel_success &&= published complete_channel.call end # Enqueue for all subscribed clients if client_ids.any? (client_ids, ) do |enqueued| channel_success &&= enqueued complete_channel.call end else # No clients, but still need to complete complete_channel.call end end end rescue => e log_error("Failed to publish message to channels #{channels}: #{e.}") EventMachine.next_tick { callback.call(false) } if callback && !callback_called end end |
#subscribe(client_id, channel, &callback) ⇒ Object
Subscribe a client to a channel
98 99 100 |
# File 'lib/faye/redis.rb', line 98 def subscribe(client_id, channel, &callback) @subscription_manager.subscribe(client_id, channel, &callback) end |
#unsubscribe(client_id, channel, &callback) ⇒ Object
Unsubscribe a client from a channel
103 104 105 |
# File 'lib/faye/redis.rb', line 103 def unsubscribe(client_id, channel, &callback) @subscription_manager.unsubscribe(client_id, channel, &callback) end |