GraphQL::Streaming
Experimental tools for GraphQL over a long-lived connection, including subscription
, @defer
and @stream
. (demo)
Installation
# Requires an experimental branch of graphql:
gem "graphql", github: "rmosolgo/graphql-ruby", branch: "defer-directive"
gem "graphql-streaming"
Usage
Defer & Stream
This gem supports queries with @defer
and @stream
.
- Use
GraphQL::Execution::DeferredExecution
:
# only available in the defer-directive branch:
MySchema.query_execution_strategy = GraphQL::Execution::DeferredExecution
MySchema.subscription_execution_strategy = GraphQL::Execution::DeferredExecution
- Choose a transport (HTTP Streaming or ActionCable) and get its client (both built-in clients depend on
Object.assign
)
HTTP Streaming Transport
StreamCollector
uses stream.write(patch)
to send chunked responses to the client. For example, you can use it with ActionController::Live
.
Create a collector
and include it in the query as context[:collector]
:
class ChunkedGraphqlsController < ApplicationController
include ActionController::Live
def create
# ...
# initialize the collector with `response.stream`
context = {
collector: StreamCollector.new(response.stream)
}
Schema.execute(query_string, variables: variables, context: context)
# close the stream when the query is done:
response.stream.close
end
end
From JavaScript, use StreamingGraphQLClient
to fetch data:
//= require graphql-streaming/streaming_graphql_client
onResponse = function(response) {
// Handle response.errors / response.data
}
StreamingGraphQLClient.fetch(
"/graphql/",
`query getPost($postId: Int!){
post(id: $postId) {
title
comments @stream {
body
}
}
}`,
{postId: 1},
onResponse,
)
The onResponse
function will be called with GraphQL response after each patch is added.
ActionCable Transport
You can use Rails 5's ActionCable
to send and receive GraphQL.
In your channel, implement an action called #fetch(data)
for executing GraphQL queries. It should use stream_graphql_query
using data["query_id"]
from the client. For example:
class GraphqlChannel < ApplicationCable::Channel
include GraphQL::Streaming::ActionCableChannel
def fetch(data)
query_string = data["query"]
variables = JSON.parse(data["variables"] || "{}")
context = {
# ...
}
# Get the query ID, which is added by the GraphQLChannel client
query_id = data["query_id"]
# Make the query within a `stream_graphql_query` block
stream_graphql_query(query_id: query_id) do |stream_ctx|
# the block provides a subscriber and collector,
# merge them into your context:
merged_ctx = context.merge(stream_ctx)
# don't forget to prevent stale data
merged_ctx[:current_user].reload
MySchema.execute(query_string, variables: variables, context: merged_ctx)
end
end
end
Then, create a GraphQLChannel
to make requests. GraphQLChannel.subscription
contains defaults for App.cable.subscription.create
:
//= require graphql-streaming/graphql_channel
App.graphqlChannel = App.cable.subscriptions.create("GraphqlChannel", GraphQLChannel.subscription)
// OPTIONAL forward log messages to console.log:
// GraphQLChannel.log = console.log.bind(console)
And you can provide overrides if you want:
// Trigger `graphql-channel:ready` when the channel is connected
App.graphqlChannel = App.cable.subscriptions.create(
"GraphqlChannel",
Object.assign(GraphQLChannel.subscription, {
connected: function() {
$(document).trigger("graphql-channel:ready")
},
})
)
Send queries with graphqlChannel.fetch
:
var queryString = "{ ... }"
var queryVariables = { /* ... */ }
var onResponse = function(response) { /* handle response.errors & response.data */}
App.graphqlChannel.fetch(queryString, queryVariables, onResponse)
The onResponse
handler will be called with the whole response each time a patch is received.
Subscription
ActionCableSubscriber
uses ActionCable
as a backend for GraphQL subscriptions. There are three parts:
- Send a subscriber along with your query
- Define a Subscription type which registers subscriptions during resolve
- Make triggers from application code
Subscriber
The subscriber rides along with your query (as context[:subscriber]
). It listens for triggers from the application, and when they happen, it re-evaluates the query and pushes an update over its channel.
class GraphqlChannel < ApplicationCable::Channel
include GraphQL::Streaming::ActionCableChannel
def fetch(data)
# ...
query_id = data["query_id"]
stream_graphql_query(query_id: query_id) do |stream_ctx|
stream_ctx[:subscriber] # => #<ActionCableSubscriber ... >
# ...
end
end
end
Subscription Type
SubscriptionType
is a plain GraphQL::ObjectType
, but its fields are special. They correspond to application triggers. When a trigger is fired, any subscriber who is listening to the corresponding field will re-evaluate its query.
Define subscription fields with subscription
:
SubscriptionType = GraphQL::ObjectType.define do
name "Subscription"
subscription :post, PostType do
argument :id, !types.Int
resolve -> (obj, args, ctx) {
Post.find(args[:id])
}
end
end
MySchema = GraphQL::Schema.define(subscription: SubscriptionType ...)
Triggers
From your application, you can trigger events on subscription fields. For example, to tell clients that a Post with a given ID changed:
class Post
def after_commit
GraphQL::Streaming::ActionCableSubscriber.trigger(:post, {id: id})
end
end
The arguments passed to .trigger
will be tested against field arguments. Any subscribers who requested a matching query will be updated. For example:
subscription {
post(id: 1) {
... postFields
}
}
would be updated by
GraphQL::Streaming::ActionCableSubscriber.trigger(:post, {id: 1})
Unsubscribing
A client can unsubscribe from future patches with .clear
. For example:
// Subscribe to data from the server
var queryHandle = App.graphqlChannel.fetch(queryString, queryVariables, onResponse)
// Unsubscribe from server pushes
App.graphqlChannel.clear(queryHandle)
No further patches will be sent to the client.
Development
bundle exec rake test
to run the tests- Sadly, no JS tests! See the demo repo for poke-around testing
TODO
- What happens to subscriptions when you redeploy or ActionCable loses its connection? Need to handle reconnecting in some way.
- Handle errors in subscriber block
- For a streamed / defered query, we need a way to know when it's done
- Tests for JS?
- Other platforms (Pusher, HTTP/2)?
- Request features from ActionCable
- Public alternative to
@channel.send(:transmit, payload)
? - Some way to stop certain streams (see monkey patches in action_cable_channel.rb)
- Public alternative to
License
The gem is available as open source under the terms of the MIT License.