Class: Faye::Redis

Inherits:
Object
  • Object
show all
Defined in:
lib/faye/redis.rb,
lib/faye/redis/logger.rb,
lib/faye/redis/version.rb,
lib/faye/redis/connection.rb,
lib/faye/redis/message_queue.rb,
lib/faye/redis/client_registry.rb,
lib/faye/redis/pubsub_coordinator.rb,
lib/faye/redis/subscription_manager.rb

Defined Under Namespace

Classes: ClientRegistry, Connection, Logger, MessageQueue, PubSubCoordinator, SubscriptionManager

Constant Summary collapse

DEFAULT_OPTIONS =

Default configuration options

{
  host: 'localhost',
  port: 6379,
  database: 0,
  password: nil,
  pool_size: 5,
  pool_timeout: 5,
  connect_timeout: 1,
  read_timeout: 1,
  write_timeout: 1,
  max_retries: 3,
  retry_delay: 1,
  client_timeout: 60,
  message_ttl: 3600,
  subscription_ttl: 3600,  # Subscription keys TTL (1 hour, matches message_ttl), provides safety net if GC fails
  namespace: 'faye',
  gc_interval: 60,  # Automatic garbage collection interval (seconds), set to 0 or false to disable
  cleanup_batch_size: 50  # Number of items per batch during cleanup (min: 1, max: 1000, prevents blocking)
}.freeze
VERSION =
'1.0.14'

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(server, options = {}) ⇒ Redis

Returns a new instance of Redis.



42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# File 'lib/faye/redis.rb', line 42

def initialize(server, options = {})
  @server = server
  @options = DEFAULT_OPTIONS.merge(options)
  @logger = Logger.new('Faye::Redis', @options)

  # Initialize components
  @connection = Connection.new(@options)
  @client_registry = ClientRegistry.new(@connection, @options)
  @subscription_manager = SubscriptionManager.new(@connection, @options)
  @message_queue = MessageQueue.new(@connection, @options)
  @pubsub_coordinator = PubSubCoordinator.new(@connection, @options)

  # Set up message routing
  setup_message_routing

  # Start automatic garbage collection timer
  start_gc_timer
end

Instance Attribute Details

#client_registryObject (readonly)

Returns the value of attribute client_registry.



34
35
36
# File 'lib/faye/redis.rb', line 34

def client_registry
  @client_registry
end

#connectionObject (readonly)

Returns the value of attribute connection.



34
35
36
# File 'lib/faye/redis.rb', line 34

def connection
  @connection
end

#message_queueObject (readonly)

Returns the value of attribute message_queue.



34
35
36
# File 'lib/faye/redis.rb', line 34

def message_queue
  @message_queue
end

#optionsObject (readonly)

Returns the value of attribute options.



34
35
36
# File 'lib/faye/redis.rb', line 34

def options
  @options
end

#pubsub_coordinatorObject (readonly)

Returns the value of attribute pubsub_coordinator.



34
35
36
# File 'lib/faye/redis.rb', line 34

def pubsub_coordinator
  @pubsub_coordinator
end

#serverObject (readonly)

Returns the value of attribute server.



34
35
36
# File 'lib/faye/redis.rb', line 34

def server
  @server
end

#subscription_managerObject (readonly)

Returns the value of attribute subscription_manager.



34
35
36
# File 'lib/faye/redis.rb', line 34

def subscription_manager
  @subscription_manager
end

Class Method Details

.create(server, options) ⇒ Object

Factory method to create a new Redis engine instance



38
39
40
# File 'lib/faye/redis.rb', line 38

def self.create(server, options)
  new(server, options)
end

Instance Method Details

#cleanup_expired(&callback) ⇒ Object

Clean up expired clients and their associated data



193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
# File 'lib/faye/redis.rb', line 193

def cleanup_expired(&callback)
  # Clean up stale local message IDs first
  cleanup_stale_message_ids

  @client_registry.cleanup_expired do |expired_count|
    @logger.info("Cleaned up #{expired_count} expired clients") if expired_count > 0

    # Always clean up orphaned subscription data (even if no expired clients)
    # This handles cases where subscriptions were orphaned due to crashes
    # and removes empty channel Sets and unused patterns
    # Uses batched processing to avoid blocking the connection pool
    @client_registry.all do |active_clients|
      @subscription_manager.cleanup_orphaned_data(active_clients) do
        callback.call(expired_count) if callback
      end
    end
  end
end

#client_exists(client_id, &callback) ⇒ Object

Check if a client exists



86
87
88
# File 'lib/faye/redis.rb', line 86

def client_exists(client_id, &callback)
  @client_registry.exists?(client_id, &callback)
end

#create_client(&callback) ⇒ Object

Create a new client



62
63
64
65
66
67
68
69
70
71
72
73
74
# File 'lib/faye/redis.rb', line 62

def create_client(&callback)
  # Ensure GC timer is started (lazy initialization)
  ensure_gc_timer_started

  client_id = generate_client_id
  @client_registry.create(client_id) do |success|
    if success
      callback.call(client_id)
    else
      callback.call(nil)
    end
  end
end

#destroy_client(client_id, &callback) ⇒ Object

Destroy a client



77
78
79
80
81
82
83
# File 'lib/faye/redis.rb', line 77

def destroy_client(client_id, &callback)
  @subscription_manager.unsubscribe_all(client_id) do
    @message_queue.clear(client_id) do
      @client_registry.destroy(client_id, &callback)
    end
  end
end

#disconnectObject

Disconnect the engine



184
185
186
187
188
189
190
# File 'lib/faye/redis.rb', line 184

def disconnect
  # Stop GC timer if running
  stop_gc_timer

  @pubsub_coordinator.disconnect
  @connection.disconnect
end

#empty_queue(client_id) ⇒ Object

Empty a client’s message queue



179
180
181
# File 'lib/faye/redis.rb', line 179

def empty_queue(client_id)
  @message_queue.dequeue_all(client_id)
end

#ping(client_id) ⇒ Object

Ping a client to keep it alive Also refreshes subscription TTLs to keep them alive while client is connected



92
93
94
95
# File 'lib/faye/redis.rb', line 92

def ping(client_id)
  @client_registry.ping(client_id)
  @subscription_manager.refresh_client_subscriptions_ttl(client_id)
end

#publish(message, channels, &callback) ⇒ Object

Publish a message to channels



108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
# File 'lib/faye/redis.rb', line 108

def publish(message, channels, &callback)
  channels = [channels] unless channels.is_a?(Array)

  begin
    # Ensure message has an ID for deduplication
    message = message.dup unless message.frozen?
    message['id'] ||= generate_message_id

    # Track this message as locally published with timestamp
    if @local_message_ids
      timestamp = Time.now.to_i
      if @local_message_ids_mutex
        @local_message_ids_mutex.synchronize { @local_message_ids[message['id']] = timestamp }
      else
        @local_message_ids[message['id']] = timestamp
      end
    end

    total_channels = channels.size
    completed_channels = 0
    callback_called = false
    all_success = true

    channels.each do |channel|
      # Get subscribers and process in parallel
      @subscription_manager.get_subscribers(channel) do |client_ids|
        # Track operations for this channel
        pending_ops = 2  # pubsub + enqueue
        channel_success = true
        ops_completed = 0

        complete_channel = lambda do
          ops_completed += 1
          if ops_completed == pending_ops
            # This channel is complete
            all_success &&= channel_success
            completed_channels += 1

            # Call final callback when all channels are done
            if completed_channels == total_channels && !callback_called && callback
              callback_called = true
              EventMachine.next_tick { callback.call(all_success) }
            end
          end
        end

        # Publish to pub/sub
        @pubsub_coordinator.publish(channel, message) do |published|
          channel_success &&= published
          complete_channel.call
        end

        # Enqueue for all subscribed clients
        if client_ids.any?
          enqueue_messages_batch(client_ids, message) do |enqueued|
            channel_success &&= enqueued
            complete_channel.call
          end
        else
          # No clients, but still need to complete
          complete_channel.call
        end
      end
    end
  rescue => e
    log_error("Failed to publish message to channels #{channels}: #{e.message}")
    EventMachine.next_tick { callback.call(false) } if callback && !callback_called
  end
end

#subscribe(client_id, channel, &callback) ⇒ Object

Subscribe a client to a channel



98
99
100
# File 'lib/faye/redis.rb', line 98

def subscribe(client_id, channel, &callback)
  @subscription_manager.subscribe(client_id, channel, &callback)
end

#unsubscribe(client_id, channel, &callback) ⇒ Object

Unsubscribe a client from a channel



103
104
105
# File 'lib/faye/redis.rb', line 103

def unsubscribe(client_id, channel, &callback)
  @subscription_manager.unsubscribe(client_id, channel, &callback)
end