Graphql::Streaming

Experimental tools for GraphQL over a long-lived connection, including subscription, @defer and @stream. (demo)

Installation

# Requires an experimental branch of graphql:
gem "graphql", github: "rmosolgo/graphql-ruby", branch: "defer-directive"
gem "graphql-streaming"

Usage

Defer & Stream

This gem supports queries with @defer and @stream.

  • Use GraphQL::Execution::DeferredExecution:
  # only available in the defer-directive branch:
  MySchema.query_execution_strategy = GraphQL::Execution::DeferredExecution
  MySchema.subscription_execution_strategy = GraphQL::Execution::DeferredExecution

HTTP Streaming Transport

StreamCollector uses stream.write(patch) to send chunked responses to the client. For example, you can use it with ActionController::Live.

Create a collector and include it in the query as context[:collector]:

class ChunkedGraphqlsController < ApplicationController
  include ActionController::Live

  def create
    # ...

    # initialize the collector with `response.stream`
    context = {
      collector: StreamCollector.new(response.stream)
    }

    Schema.execute(query_string, variables: variables, context: context)

    # close the stream when the query is done:
    response.stream.close
  end
end

From JavaScript, use StreamingGraphQLClient to fetch data:

//= require graphql-streaming/streaming_graphql_client

onResponse = function(response) {
  // Handle response.errors / response.data
}

StreamingGraphQLClient.fetch(
  "/graphql/",
  `query getPost($postId: Int!){
    post(id: $postId) {
      title
      comments @stream {
        body
      }
    }
  }`,
  {postId: 1},
  onResponse,
)

The onResponse function will be called with GraphQL response after each patch is added.

ActionCable Transport

You can use Rails 5's ActionCable to send and receive GraphQL.

In your channel, implement an action called #fetch(data) for executing GraphQL queries. It should add an ActionCableCollector as context[:collector], using data["query_id"] from the client. For example:

class GraphqlChannel < ApplicationCable::Channel
  # ...

  def fetch(data)
    query_string = data["query"]
    variables = ensure_hash(data["variables"] || {})
    context = {}

    # Get the query ID, which is added by the GraphQLChannel client
    query_id = data["query_id"]
    # Get a broadcaster, which is the target for patches
    broadcaster = ActionCable.server.broadcaster_for(channel_name)
    # The collector passes patches on to the broadcaster
    context[:collector] = GraphQL::Streaming::ActionCableCollector.new(query_id, broadcaster)

    # Run the query
    Schema.execute(query_string, variables: variables, context: context)

    # Tell the client to stop listening for patches
    context[:collector].close
  end
end

Then, create a GraphQLChannel to make requests. GraphQLChannel.subscription contains defaults for App.cable.subscription.create:

//= require graphql-streaming/graphql_channel
App.graphqlChannel = App.cable.subscriptions.create("GraphqlChannel", GraphQLChannel.subscription)

// OPTIONAL forward log messages to console.log:
// GraphQLChannel.log = console.log.bind(console)

And you can provide overrides if you want:

// Trigger `graphql-channel:ready` when the channel is connected
App.graphqlChannel = App.cable.subscriptions.create(
  "GraphqlChannel",
  Object.assign(GraphQLChannel.subscription, {
    connected: function() {
      $(document).trigger("graphql-channel:ready")
    },
  })
)

Send queries with graphqlChannel.fetch:

var queryString = "{ ... }"
var queryVariables = { /* ... */ }
var onResponse = function(response) { /* handle response.errors & response.data */}
App.graphqlChannel.fetch(queryString, queryVariables, onResponse)

The onResponse handler will be called with the whole response each time a patch is received.

Subscription

ActionCableSubscriber uses ActionCable as a backend for GraphQL subscriptions. There are three parts:

  • Send a subscriber along with your query
  • Define a Subscription type which registers subscriptions during resolve
  • Make triggers from application code

Subscriber

The subscriber rides along with your query (as context[:subscriber]). It listens for triggers from the application, and when they happen, it re-evaluates the query and pushes an update over its channel.

# Get the query ID from the client
query_id = data["query_id"]

context = { current_user: user_from_session(session) }

context[:subscriber] = GraphQL::Streaming::ActionCableSubscriber.new(self, query_id) do
  # This block is called when a matching trigger occurs
  context[:current_user].reload
  Schema.execute(query_string, variables: variables, context: context)
end

Schema.execute(query_string, variables: variables, context: context)


# If there are no outstanding subscriptions,
# tell the client to stop listening for patches
if !context[:subscriber].subscribed?
  context[:collector].close
end

Subscription Type

SubscriptionType is a plain GraphQL::ObjectType, but its fields are special. They correspond to application triggers. When a trigger is fired, any subscriber who is listening to the corresponding field will re-evaluate its query.

Define subscription fields with subscription:

SubscriptionType = GraphQL::ObjectType.define do
  name "Subscription"
  subscription :post, PostType do
    argument :id, !types.Int
    resolve -> (obj, args, ctx) {
      Post.find(args[:id])
    }
  end
end

MySchema = GraphQL::Schema.new(subscription: SubscriptionType ...)

Triggers

From your application, you can trigger events on subscription fields. For example, to tell clients that a Post with a given ID changed:

class Post
  def after_commit
    GraphQL::Streaming::ActionCableSubscriber.trigger(:post, {id: id})
  end
end

The arguments passed to .trigger will be tested against field arguments. Any subscribers who requested a matching query will be updated. For example:

subscription {
  post(id: 1) {
    ... postFields
  }
}

would be updated by

GraphQL::Streaming::ActionCableSubscriber.trigger(:post, {id: 1})

Development

  • bundle exec rake test to run the tests
  • Sadly, no JS tests! See the demo repo for poke-around testing

TODO

  • What happens to subscriptions when you redeploy or ActionCable loses its connection? Need to handle reconnecting in some way.
  • Handle errors in subscriber block
  • Improve middleware so you don't have to manually close ActionCableCollectors
  • Tests for JS?
  • Other platforms (Pusher, HTTP/2)?
  • Public alternative to @channel.send(:transmit, payload)?

License

The gem is available as open source under the terms of the MIT License.