Class: GraphQL::Subscriptions::AnyCableSubscriptions

Inherits:
GraphQL::Subscriptions show all
Extended by:
Forwardable
Defined in:
lib/graphql/subscriptions/anycable_subscriptions.rb

Constant Summary collapse

SUBSCRIPTION_PREFIX =

HASH: Stores subscription data: query, context, …

"graphql-subscription:"
FINGERPRINTS_PREFIX =

ZSET: To get fingerprints by topic

"graphql-fingerprints:"
SUBSCRIPTIONS_PREFIX =

SET: To get subscriptions by fingerprint

"graphql-subscriptions:"
CHANNEL_PREFIX =

SET: Auxiliary structure for whole channel’s subscriptions cleanup

"graphql-channel:"
EVENT_PREFIX =

For backward compatibility:

"graphql-event:"
SUBSCRIPTION_EVENTS_PREFIX =
"graphql-subscription-events:"

Instance Method Summary collapse

Constructor Details

#initialize(serializer: Serialize, **rest) ⇒ AnyCableSubscriptions

Returns a new instance of AnyCableSubscriptions.

Parameters:

  • serializer (<#dump(obj), #load(string)] Used for serializing messages before handing them to `.broadcast(msg)`) (defaults to: Serialize)

    erializer [<#dump(obj), #load(string)] Used for serializing messages before handing them to ‘.broadcast(msg)`



68
69
70
71
# File 'lib/graphql/subscriptions/anycable_subscriptions.rb', line 68

def initialize(serializer: Serialize, **rest)
  @serializer = serializer
  super
end

Instance Method Details

#delete_channel_subscriptions(channel_or_id) ⇒ Object

The channel was closed, forget about it and its subscriptions



225
226
227
228
229
230
231
232
233
234
235
236
# File 'lib/graphql/subscriptions/anycable_subscriptions.rb', line 225

def delete_channel_subscriptions(channel_or_id)
  # For backward compatibility
  channel_id = channel_or_id.is_a?(String) ? channel_or_id : read_subscription_id(channel_or_id)

  # Missing in case disconnect happens before #execute
  return unless channel_id

  redis.smembers(CHANNEL_PREFIX + channel_id).each do |subscription_id|
    delete_subscription(subscription_id)
  end
  redis.del(CHANNEL_PREFIX + channel_id)
end

#delete_legacy_subscription(subscription_id) ⇒ Object



212
213
214
215
216
217
218
219
220
221
222
# File 'lib/graphql/subscriptions/anycable_subscriptions.rb', line 212

def delete_legacy_subscription(subscription_id)
  return unless config.handle_legacy_subscriptions

  events = redis.smembers(SUBSCRIPTION_EVENTS_PREFIX + subscription_id)
  redis.pipelined do
    events.each do |event_topic|
      redis.srem(EVENT_PREFIX + event_topic, subscription_id)
    end
    redis.del(SUBSCRIPTION_EVENTS_PREFIX + subscription_id)
  end
end

#delete_subscription(subscription_id) ⇒ Object



190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
# File 'lib/graphql/subscriptions/anycable_subscriptions.rb', line 190

def delete_subscription(subscription_id)
  events = redis.hget(SUBSCRIPTION_PREFIX + subscription_id, :events)
  events = events ? JSON.parse(events) : {}
  fingerprint_subscriptions = {}
  redis.pipelined do |pipeline|
    events.each do |topic, fingerprint|
      pipeline.srem(SUBSCRIPTIONS_PREFIX + fingerprint, subscription_id)
      score = pipeline.zincrby(FINGERPRINTS_PREFIX + topic, -1, fingerprint)
      fingerprint_subscriptions[FINGERPRINTS_PREFIX + topic] = score
    end
    # Delete subscription itself
    pipeline.del(SUBSCRIPTION_PREFIX + subscription_id)
  end
  # Clean up fingerprints that doesn't have any subscriptions left
  redis.pipelined do |pipeline|
    fingerprint_subscriptions.each do |key, score|
      pipeline.zremrangebyscore(key, '-inf', '0') if score.value.zero?
    end
  end
  delete_legacy_subscription(subscription_id)
end

#deliver(stream_key, result) ⇒ Object

This subscription was re-evaluated. Send it to the specific stream where this client was waiting.

Parameters:

  • strean_key (String)
  • result (#to_h)

    result to send to clients



132
133
134
135
# File 'lib/graphql/subscriptions/anycable_subscriptions.rb', line 132

def deliver(stream_key, result)
  payload = { result: result.to_h, more: true }.to_json
  anycable.broadcast(stream_key, payload)
end

#execute(subscription_id, event, object) ⇒ Object

Disable this method as there is no fingerprint (it can be retrieved from subscription though)

Raises:

  • (NotImplementedError)


124
125
126
# File 'lib/graphql/subscriptions/anycable_subscriptions.rb', line 124

def execute(subscription_id, event, object)
  raise NotImplementedError, "Use execute_all method instead of execute to get actual event fingerprint"
end

#execute_all(event, object) ⇒ Object

An event was triggered. Re-evaluate all subscribed queries and push the data over ActionCable.



75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
# File 'lib/graphql/subscriptions/anycable_subscriptions.rb', line 75

def execute_all(event, object)
  execute_legacy(event, object) if config.handle_legacy_subscriptions

  fingerprints = redis.zrange(FINGERPRINTS_PREFIX + event.topic, 0, -1)
  return if fingerprints.empty?

  fingerprint_subscription_ids = Hash[fingerprints.zip(
    redis.pipelined do |pipeline|
      fingerprints.map do |fingerprint|
        pipeline.smembers(SUBSCRIPTIONS_PREFIX + fingerprint)
      end
    end
  )]

  fingerprint_subscription_ids.each do |fingerprint, subscription_ids|
    execute_grouped(fingerprint, subscription_ids, event, object)
  end

  # Call to +trigger+ returns this. Convenient for playing in console
  Hash[fingerprint_subscription_ids.map { |k,v| [k, v.size] }]
end

#execute_grouped(fingerprint, subscription_ids, event, object) ⇒ Object

The fingerprint has told us that this response should be shared by all subscribers, so just run it once, then deliver the result to every subscriber



99
100
101
102
103
104
105
106
107
108
109
110
# File 'lib/graphql/subscriptions/anycable_subscriptions.rb', line 99

def execute_grouped(fingerprint, subscription_ids, event, object)
  return if subscription_ids.empty?

  subscription_id = subscription_ids.find { |sid| redis.exists?(SUBSCRIPTION_PREFIX + sid) }
  return unless subscription_id # All subscriptions has expired but haven't cleaned up yet

  result = execute_update(subscription_id, event, object)
  return unless result

  # Having calculated the result _once_, send the same payload to all subscribers
  deliver(SUBSCRIPTIONS_PREFIX + fingerprint, result)
end

#execute_legacy(event, object) ⇒ Object

For migration from pre-1.0 graphql-anycable gem



113
114
115
116
117
118
119
120
121
# File 'lib/graphql/subscriptions/anycable_subscriptions.rb', line 113

def execute_legacy(event, object)
  redis.smembers(EVENT_PREFIX + event.topic).each do |subscription_id|
    next unless redis.exists?(SUBSCRIPTION_PREFIX + subscription_id)
    result = execute_update(subscription_id, event, object)
    next unless result

    deliver(SUBSCRIPTION_PREFIX + subscription_id, result)
  end
end

#read_subscription(subscription_id) ⇒ Object

Return the query from “storage” (in redis)



177
178
179
180
181
182
183
184
185
186
187
188
# File 'lib/graphql/subscriptions/anycable_subscriptions.rb', line 177

def read_subscription(subscription_id)
  redis.mapped_hmget(
    "#{SUBSCRIPTION_PREFIX}#{subscription_id}",
    :query_string, :variables, :context, :operation_name
  ).tap do |subscription|
    return if subscription.values.all?(&:nil?) # Redis returns hash with all nils for missing key

    subscription[:context] = @serializer.load(subscription[:context])
    subscription[:variables] = JSON.parse(subscription[:variables])
    subscription[:operation_name] = nil if subscription[:operation_name].strip == ""
  end
end

#write_subscription(query, events) ⇒ Object

Save query to “storage” (in redis)



138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
# File 'lib/graphql/subscriptions/anycable_subscriptions.rb', line 138

def write_subscription(query, events)
  context = query.context.to_h
  subscription_id = context.delete(:subscription_id) || build_id
  channel = context.delete(:channel)

  raise GraphQL::AnyCable::ChannelConfigurationError unless channel

  channel_uniq_id = config.use_client_provided_uniq_id? ? channel.params["channelId"] : subscription_id

  # Store subscription_id in the channel state to cleanup on disconnect
  write_subscription_id(channel, channel_uniq_id)


  events.each do |event|
    channel.stream_from(SUBSCRIPTIONS_PREFIX + event.fingerprint)
  end

  data = {
    query_string: query.query_string,
    variables: query.provided_variables.to_json,
    context: @serializer.dump(context.to_h),
    operation_name: query.operation_name.to_s,
    events: events.map { |e| [e.topic, e.fingerprint] }.to_h.to_json,
  }

  redis.multi do |pipeline|
    pipeline.sadd(CHANNEL_PREFIX + channel_uniq_id, [subscription_id])
    pipeline.mapped_hmset(SUBSCRIPTION_PREFIX + subscription_id, data)
    events.each do |event|
      pipeline.zincrby(FINGERPRINTS_PREFIX + event.topic, 1, event.fingerprint)
      pipeline.sadd(SUBSCRIPTIONS_PREFIX + event.fingerprint, [subscription_id])
    end
    next unless config.subscription_expiration_seconds
    pipeline.expire(CHANNEL_PREFIX + channel_uniq_id, config.subscription_expiration_seconds)
    pipeline.expire(SUBSCRIPTION_PREFIX + subscription_id, config.subscription_expiration_seconds)
  end
end