Class: GraphQL::Subscriptions::AnyCableSubscriptions
- Inherits:
-
GraphQL::Subscriptions
- Object
- GraphQL::Subscriptions
- GraphQL::Subscriptions::AnyCableSubscriptions
- Defined in:
- lib/graphql/subscriptions/anycable_subscriptions.rb
Constant Summary collapse
- SUBSCRIPTION_PREFIX =
"graphql-subscription:"
- EVENT_PREFIX =
"graphql-event:"
- CHANNEL_PREFIX =
"graphql-channel:"
Instance Method Summary collapse
- #delete_channel_subscriptions(channel_id) ⇒ Object
-
#delete_subscription(subscription_id) ⇒ Object
The channel was closed, forget about it.
-
#deliver(subscription_id, result) ⇒ Object
This subscription was re-evaluated.
-
#execute_all(event, object) ⇒ Object
An event was triggered.
-
#initialize(serializer: Serialize, **rest) ⇒ AnyCableSubscriptions
constructor
A new instance of AnyCableSubscriptions.
-
#read_subscription(subscription_id) ⇒ Object
Return the query from “storage” (in redis).
-
#write_subscription(query, events) ⇒ Object
Save query to “storage” (in redis).
Constructor Details
#initialize(serializer: Serialize, **rest) ⇒ AnyCableSubscriptions
Returns a new instance of AnyCableSubscriptions.
60 61 62 63 |
# File 'lib/graphql/subscriptions/anycable_subscriptions.rb', line 60 def initialize(serializer: Serialize, **rest) @serializer = serializer super end |
Instance Method Details
#delete_channel_subscriptions(channel_id) ⇒ Object
127 128 129 130 131 132 |
# File 'lib/graphql/subscriptions/anycable_subscriptions.rb', line 127 def delete_channel_subscriptions(channel_id) redis.smembers(CHANNEL_PREFIX + channel_id).each do |subscription_id| delete_subscription(subscription_id) end redis.del(CHANNEL_PREFIX + channel_id) end |
#delete_subscription(subscription_id) ⇒ Object
The channel was closed, forget about it.
117 118 119 120 121 122 123 124 125 |
# File 'lib/graphql/subscriptions/anycable_subscriptions.rb', line 117 def delete_subscription(subscription_id) # Remove subscription ids from all events events_data = redis.hget(SUBSCRIPTION_PREFIX + subscription_id, :events) events_data && JSON.parse(events_data).each do |event_topic| redis.srem(EVENT_PREFIX + event_topic, subscription_id) end # Delete subscription itself redis.del(SUBSCRIPTION_PREFIX + subscription_id) end |
#deliver(subscription_id, result) ⇒ Object
This subscription was re-evaluated. Send it to the specific stream where this client was waiting.
75 76 77 78 |
# File 'lib/graphql/subscriptions/anycable_subscriptions.rb', line 75 def deliver(subscription_id, result) payload = { result: result.to_h, more: true } anycable.broadcast(SUBSCRIPTION_PREFIX + subscription_id, payload.to_json) end |
#execute_all(event, object) ⇒ Object
An event was triggered. Re-evaluate all subscribed queries and push the data over ActionCable.
67 68 69 70 71 |
# File 'lib/graphql/subscriptions/anycable_subscriptions.rb', line 67 def execute_all(event, object) redis.smembers(EVENT_PREFIX + event.topic).each do |subscription_id| execute(subscription_id, event, object) end end |
#read_subscription(subscription_id) ⇒ Object
Return the query from “storage” (in redis)
106 107 108 109 110 111 112 113 114 |
# File 'lib/graphql/subscriptions/anycable_subscriptions.rb', line 106 def read_subscription(subscription_id) redis.mapped_hmget( "#{SUBSCRIPTION_PREFIX}#{subscription_id}", :query_string, :variables, :context, :operation_name, ).tap do |subscription| subscription[:context] = @serializer.load(subscription[:context]) subscription[:variables] = JSON.parse(subscription[:variables]) end end |
#write_subscription(query, events) ⇒ Object
Save query to “storage” (in redis)
81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 |
# File 'lib/graphql/subscriptions/anycable_subscriptions.rb', line 81 def write_subscription(query, events) context = query.context.to_h subscription_id = context[:subscription_id] ||= build_id channel = context.delete(:channel) stream = context[:action_cable_stream] ||= SUBSCRIPTION_PREFIX + subscription_id channel.stream_from(stream) data = { query_string: query.query_string, variables: query.provided_variables.to_json, context: @serializer.dump(context.to_h), operation_name: query.operation_name, events: events.map(&:topic).to_json, } redis.multi do redis.sadd(CHANNEL_PREFIX + channel.params["channelId"], subscription_id) redis.mapped_hmset(SUBSCRIPTION_PREFIX + subscription_id, data) events.each do |event| redis.sadd(EVENT_PREFIX + event.topic, subscription_id) end end end |