GraphQL::Streaming Build Status Gem Version

Experimental tools for GraphQL over a long-lived connection, including subscription, @defer and @stream. (demo)

Installation

# Requires an experimental branch of graphql:
gem "graphql", github: "rmosolgo/graphql-ruby", branch: "defer-directive"
gem "graphql-streaming"

Usage

Defer & Stream

This gem supports queries with @defer and @stream.

  • Use GraphQL::Execution::DeferredExecution:
  # only available in the defer-directive branch:
  MySchema.query_execution_strategy = GraphQL::Execution::DeferredExecution
  MySchema.subscription_execution_strategy = GraphQL::Execution::DeferredExecution

HTTP Streaming Transport

StreamCollector uses stream.write(patch) to send chunked responses to the client. For example, you can use it with ActionController::Live.

Create a collector and include it in the query as context[:collector]:

class ChunkedGraphqlsController < ApplicationController
  include ActionController::Live

  def create
    # ...

    # initialize the collector with `response.stream`
    context = {
      collector: StreamCollector.new(response.stream)
    }

    Schema.execute(query_string, variables: variables, context: context)

    # close the stream when the query is done:
    response.stream.close
  end
end

From JavaScript, use StreamingGraphQLClient to fetch data:

//= require graphql-streaming/streaming_graphql_client

onResponse = function(response) {
  // Handle response.errors / response.data
}

StreamingGraphQLClient.fetch(
  "/graphql/",
  `query getPost($postId: Int!){
    post(id: $postId) {
      title
      comments @stream {
        body
      }
    }
  }`,
  {postId: 1},
  onResponse,
)

The onResponse function will be called with GraphQL response after each patch is added.

ActionCable Transport

You can use Rails 5's ActionCable to send and receive GraphQL.

In your channel, implement an action called #fetch(data) for executing GraphQL queries. It should use stream_graphql_query using data["query_id"] from the client. For example:

class GraphqlChannel < ApplicationCable::Channel
  include GraphQL::Streaming::ActionCableChannel

  def fetch(data)
    query_string = data["query"]
    variables = JSON.parse(data["variables"] || "{}")
    context = {
      # ...
    }

    # Get the query ID, which is added by the GraphQLChannel client
    query_id = data["query_id"]

    # Make the query within a `stream_graphql_query` block
    stream_graphql_query(query_id: query_id) do |stream_ctx|
      # the block provides a subscriber and collector,
      # merge them into your context:
      merged_ctx = context.merge(stream_ctx)
      # don't forget to prevent stale data
      merged_ctx[:current_user].reload
      MySchema.execute(query_string, variables: variables, context: merged_ctx)
    end
  end
end

Then, create a GraphQLChannel to make requests. GraphQLChannel.subscription contains defaults for App.cable.subscription.create:

//= require graphql-streaming/graphql_channel
App.graphqlChannel = App.cable.subscriptions.create("GraphqlChannel", GraphQLChannel.subscription)

// OPTIONAL forward log messages to console.log:
// GraphQLChannel.log = console.log.bind(console)

And you can provide overrides if you want:

// Trigger `graphql-channel:ready` when the channel is connected
App.graphqlChannel = App.cable.subscriptions.create(
  "GraphqlChannel",
  Object.assign(GraphQLChannel.subscription, {
    connected: function() {
      $(document).trigger("graphql-channel:ready")
    },
  })
)

Send queries with graphqlChannel.fetch:

var queryString = "{ ... }"
var queryVariables = { /* ... */ }
var onResponse = function(response) { /* handle response.errors & response.data */}
App.graphqlChannel.fetch(queryString, queryVariables, onResponse)

The onResponse handler will be called with the whole response each time a patch is received.

Subscription

ActionCableSubscriber uses ActionCable as a backend for GraphQL subscriptions. There are three parts:

  • Send a subscriber along with your query
  • Define a Subscription type which registers subscriptions during resolve
  • Make triggers from application code

Subscriber

The subscriber rides along with your query (as context[:subscriber]). It listens for triggers from the application, and when they happen, it re-evaluates the query and pushes an update over its channel.

class GraphqlChannel < ApplicationCable::Channel
  include GraphQL::Streaming::ActionCableChannel

  def fetch(data)
    # ...
    query_id = data["query_id"]
    stream_graphql_query(query_id: query_id) do |stream_ctx|
      stream_ctx[:subscriber] # => #<ActionCableSubscriber ... >
      # ...
    end
  end
end

Subscription Type

SubscriptionType is a plain GraphQL::ObjectType, but its fields are special. They correspond to application triggers. When a trigger is fired, any subscriber who is listening to the corresponding field will re-evaluate its query.

Define subscription fields with subscription:

SubscriptionType = GraphQL::ObjectType.define do
  name "Subscription"
  subscription :post, PostType do
    argument :id, !types.Int
    resolve -> (obj, args, ctx) {
      Post.find(args[:id])
    }
  end
end

MySchema = GraphQL::Schema.define(subscription: SubscriptionType ...)

Triggers

From your application, you can trigger events on subscription fields. For example, to tell clients that a Post with a given ID changed:

class Post
  def after_commit
    GraphQL::Streaming::ActionCableSubscriber.trigger(:post, {id: id})
  end
end

The arguments passed to .trigger will be tested against field arguments. Any subscribers who requested a matching query will be updated. For example:

subscription {
  post(id: 1) {
    ... postFields
  }
}

would be updated by

GraphQL::Streaming::ActionCableSubscriber.trigger(:post, {id: 1})

Unsubscribing

A client can unsubscribe from future patches with .clear. For example:

// Subscribe to data from the server
var queryHandle = App.graphqlChannel.fetch(queryString, queryVariables, onResponse)
// Unsubscribe from server pushes
App.graphqlChannel.clear(queryHandle)

No further patches will be sent to the client.

Development

  • bundle exec rake test to run the tests
  • Sadly, no JS tests! See the demo repo for poke-around testing

TODO

  • What happens to subscriptions when you redeploy or ActionCable loses its connection? Need to handle reconnecting in some way.
  • Handle errors in subscriber block
  • For a streamed / defered query, we need a way to know when it's done
  • Tests for JS?
  • Other platforms (Pusher, HTTP/2)?
  • Request features from ActionCable
    • Public alternative to @channel.send(:transmit, payload)?
    • Some way to stop certain streams (see monkey patches in action_cable_channel.rb)

License

The gem is available as open source under the terms of the MIT License.