Class: GraphQL::Subscriptions::ActionCableSubscriptions

Inherits:
GraphQL::Subscriptions show all
Defined in:
lib/graphql/subscriptions/action_cable_subscriptions.rb

Overview

A subscriptions implementation that sends data as ActionCable broadcastings.

Some things to keep in mind:

  • No queueing system; ActiveJob should be added
  • Take care to reload context when re-delivering the subscription. (see Query#subscription_update?)
  • Avoid the async ActionCable adapter and use the redis or PostgreSQL adapters instead. Otherwise calling #trigger won't work from background jobs or the Rails console.

Examples:

Adding ActionCableSubscriptions to your schema

class MySchema < GraphQL::Schema
  # ...
  use GraphQL::Subscriptions::ActionCableSubscriptions
end

Implementing a channel for GraphQL Subscriptions

class GraphqlChannel < ApplicationCable::Channel
  def subscribed
    @subscription_ids = []
  end

  def execute(data)
    query = data["query"]
    variables = ensure_hash(data["variables"])
    operation_name = data["operationName"]
    context = {
      # Re-implement whatever context methods you need
      # in this channel or ApplicationCable::Channel
      # current_user: current_user,
      # Make sure the channel is in the context
      channel: self,
    }

    result = MySchema.execute({
      query: query,
      context: context,
      variables: variables,
      operation_name: operation_name
    })

    payload = {
      result: result.to_h,
      more: result.subscription?,
    }

    # Track the subscription here so we can remove it
    # on unsubscribe.
    if result.context[:subscription_id]
      @subscription_ids << result.context[:subscription_id]
    end

    transmit(payload)
  end

  def unsubscribed
    @subscription_ids.each { |sid|
      MySchema.subscriptions.delete_subscription(sid)
    }
  end

  private

    def ensure_hash(ambiguous_param)
      case ambiguous_param
      when String
        if ambiguous_param.present?
          ensure_hash(JSON.parse(ambiguous_param))
        else
          {}
        end
      when Hash, ActionController::Parameters
        ambiguous_param
      when nil
        {}
      else
        raise ArgumentError, "Unexpected parameter: #{ambiguous_param}"
      end
    end
end

Constant Summary collapse

SUBSCRIPTION_PREFIX =
"graphql-subscription:"
EVENT_PREFIX =
"graphql-event:"

Instance Attribute Summary

Attributes inherited from GraphQL::Subscriptions

#default_broadcastable

Instance Method Summary collapse

Methods inherited from GraphQL::Subscriptions

#broadcastable?, #build_id, #each_subscription_id, #execute, #execute_update, #normalize_name, #trigger, use

Constructor Details

#initialize(serializer: Serialize, namespace: '', action_cable: ActionCable, action_cable_coder: ActiveSupport::JSON, **rest) ⇒ ActionCableSubscriptions

Returns a new instance of ActionCableSubscriptions.

Parameters:

  • serializer (<#dump(obj), #load(string)] Used for serializing messages before handing them to `.broadcast(msg)`) (defaults to: Serialize)

    erializer [<#dump(obj), #load(string)] Used for serializing messages before handing them to .broadcast(msg)

  • namespace (string) (defaults to: '')

    Used to namespace events and subscriptions (default: '')


90
91
92
93
94
95
96
97
98
99
100
# File 'lib/graphql/subscriptions/action_cable_subscriptions.rb', line 90

def initialize(serializer: Serialize, namespace: '', action_cable: ActionCable, action_cable_coder: ActiveSupport::JSON, **rest)
  # A per-process map of subscriptions to deliver.
  # This is provided by Rails, so let's use it
  @subscriptions = Concurrent::Map.new
  @events = Concurrent::Map.new { |h, k| h[k] = Concurrent::Map.new { |h2, k2| h2[k2] = Concurrent::Array.new } }
  @action_cable = action_cable
  @action_cable_coder = action_cable_coder
  @serializer = serializer
  @transmit_ns = namespace
  super
end

Instance Method Details

#delete_subscription(subscription_id) ⇒ Object

The channel was closed, forget about it.


188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
# File 'lib/graphql/subscriptions/action_cable_subscriptions.rb', line 188

def delete_subscription(subscription_id)
  query = @subscriptions.delete(subscription_id)
  # This can be `nil` when `.trigger` happens inside an unsubscribed ActionCable channel,
  # see https://github.com/rmosolgo/graphql-ruby/issues/2478
  if query
    events = query.context.namespace(:subscriptions)[:events]
    events.each do |event|
      ev_by_fingerprint = @events[event.topic]
      ev_for_fingerprint = ev_by_fingerprint[event.fingerprint]
      ev_for_fingerprint.delete(event)
      if ev_for_fingerprint.empty?
        ev_by_fingerprint.delete(event.fingerprint)
      end
    end
  end
end

#deliver(subscription_id, result) ⇒ Object

This subscription was re-evaluated. Send it to the specific stream where this client was waiting.


112
113
114
115
# File 'lib/graphql/subscriptions/action_cable_subscriptions.rb', line 112

def deliver(subscription_id, result)
  payload = { result: result.to_h, more: true }
  @action_cable.server.broadcast(stream_subscription_name(subscription_id), payload)
end

#execute_all(event, object) ⇒ Object

An event was triggered; Push the data over ActionCable. Subscribers will re-evaluate locally.


104
105
106
107
108
# File 'lib/graphql/subscriptions/action_cable_subscriptions.rb', line 104

def execute_all(event, object)
  stream = stream_event_name(event)
  message = @serializer.dump(object)
  @action_cable.server.broadcast(stream, message)
end

#read_subscription(subscription_id) ⇒ Object

Return the query from "storage" (in memory)


170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
# File 'lib/graphql/subscriptions/action_cable_subscriptions.rb', line 170

def read_subscription(subscription_id)
  query = @subscriptions[subscription_id]
  if query.nil?
    # This can happen when a subscription is triggered from an unsubscribed channel,
    # see https://github.com/rmosolgo/graphql-ruby/issues/2478.
    # (This `nil` is handled by `#execute_update`)
    nil
  else
    {
      query_string: query.query_string,
      variables: query.provided_variables,
      context: query.context.to_h,
      operation_name: query.operation_name,
    }
  end
end

#setup_stream(channel, initial_event) ⇒ Object

Every subscribing channel is listening here, but only one of them takes any action. This is so we can reuse payloads when possible, and make one payload to send to all subscribers.

But the problem is, any channel could close at any time, so each channel has to be ready to take over the primary position.

To make sure there's always one-and-only-one channel building payloads, let the listener belonging to the first event on the list be the one to build and publish payloads.


146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
# File 'lib/graphql/subscriptions/action_cable_subscriptions.rb', line 146

def setup_stream(channel, initial_event)
  topic = initial_event.topic
  channel.stream_from(stream_event_name(initial_event), coder: @action_cable_coder) do |message|
    object = @serializer.load(message)
    events_by_fingerprint = @events[topic]
    events_by_fingerprint.each do |_fingerprint, events|
      if events.any? && events.first == initial_event
        # The fingerprint has told us that this response should be shared by all subscribers,
        # so just run it once, then deliver the result to every subscriber
        first_event = events.first
        first_subscription_id = first_event.context.fetch(:subscription_id)
        result = execute_update(first_subscription_id, first_event, object)
        # Having calculated the result _once_, send the same payload to all subscribers
        events.each do |event|
          subscription_id = event.context.fetch(:subscription_id)
          deliver(subscription_id, result)
        end
      end
    end
    nil
  end
end

#write_subscription(query, events) ⇒ Object

A query was run where these events were subscribed to. Store them in memory in this ActionCable frontend. It will receive notifications when events come in and re-evaluate the query locally.


121
122
123
124
125
126
127
128
129
130
131
132
133
# File 'lib/graphql/subscriptions/action_cable_subscriptions.rb', line 121

def write_subscription(query, events)
  channel = query.context.fetch(:channel)
  subscription_id = query.context[:subscription_id] ||= build_id
  stream = stream_subscription_name(subscription_id)
  channel.stream_from(stream)
  @subscriptions[subscription_id] = query
  events.each do |event|
    # Setup a new listener to run all events with this topic in this process
    setup_stream(channel, event)
    # Add this event to the list of events to be updated
    @events[event.topic][event.fingerprint] << event
  end
end