Graphql::Streaming
Experimental tools for GraphQL over a long-lived connection, including subscription, @defer and @stream. (demo)
Installation
# Requires an experimental branch of graphql:
gem "graphql", github: "rmosolgo/graphql-ruby", branch: "defer-directive"
gem "graphql-streaming"
Usage
Defer & Stream
This gem supports queries with @defer and @stream.
- Use
GraphQL::Execution::DeferredExecution:
# only available in the defer-directive branch:
MySchema.query_execution_strategy = GraphQL::Execution::DeferredExecution
MySchema.subscription_execution_strategy = GraphQL::Execution::DeferredExecution
- Choose a transport (HTTP Streaming or ActionCable) and get its client (both built-in clients depend on
Object.assign)
HTTP Streaming Transport
StreamCollector uses stream.write(patch) to send chunked responses to the client. For example, you can use it with ActionController::Live.
Create a collector and include it in the query as context[:collector]:
class ChunkedGraphqlsController < ApplicationController
include ActionController::Live
def create
# ...
# initialize the collector with `response.stream`
context = {
collector: StreamCollector.new(response.stream)
}
Schema.execute(query_string, variables: variables, context: context)
# close the stream when the query is done:
response.stream.close
end
end
From JavaScript, use StreamingGraphQLClient to fetch data:
//= require graphql-streaming/streaming_graphql_client
onResponse = function(response) {
// Handle response.errors / response.data
}
StreamingGraphQLClient.fetch(
"/graphql/",
`query getPost($postId: Int!){
post(id: $postId) {
title
comments @stream {
body
}
}
}`,
{postId: 1},
onResponse,
)
The onResponse function will be called with GraphQL response after each patch is added.
ActionCable Transport
You can use Rails 5's ActionCable to send and receive GraphQL.
In your channel, implement an action called #fetch(data) for executing GraphQL queries. It should add an ActionCableCollector as context[:collector], using data["query_id"] from the client. For example:
class GraphqlChannel < ApplicationCable::Channel
# ...
def fetch(data)
query_string = data["query"]
variables = ensure_hash(data["variables"] || {})
context = {}
# Get the query ID, which is added by the GraphQLChannel client
query_id = data["query_id"]
# Get a broadcaster, which is the target for patches
broadcaster = ActionCable.server.broadcaster_for(channel_name)
# The collector passes patches on to the broadcaster
context[:collector] = GraphQL::Streaming::ActionCableCollector.new(query_id, broadcaster)
# Run the query
Schema.execute(query_string, variables: variables, context: context)
# Tell the client to stop listening for patches
context[:collector].close
end
end
Then, create a GraphQLChannel to make requests. GraphQLChannel.subscription contains defaults for App.cable.subscription.create:
//= require graphql-streaming/graphql_channel
App.graphqlChannel = App.cable.subscriptions.create("GraphqlChannel", GraphQLChannel.subscription)
// OPTIONAL forward log messages to console.log:
// GraphQLChannel.log = console.log.bind(console)
And you can provide overrides if you want:
// Trigger `graphql-channel:ready` when the channel is connected
App.graphqlChannel = App.cable.subscriptions.create(
"GraphqlChannel",
Object.assign(GraphQLChannel.subscription, {
connected: function() {
$(document).trigger("graphql-channel:ready")
},
})
)
Send queries with graphqlChannel.fetch:
var queryString = "{ ... }"
var queryVariables = { /* ... */ }
var onResponse = function(response) { /* handle response.errors & response.data */}
App.graphqlChannel.fetch(queryString, queryVariables, onResponse)
The onResponse handler will be called with the whole response each time a patch is received.
Subscription
ActionCableSubscriber uses ActionCable as a backend for GraphQL subscriptions. There are three parts:
- Send a subscriber along with your query
- Define a Subscription type which registers subscriptions during resolve
- Make triggers from application code
Subscriber
The subscriber rides along with your query (as context[:subscriber]). It listens for triggers from the application, and when they happen, it re-evaluates the query and pushes an update over its channel.
# Get the query ID from the client
query_id = data["query_id"]
context = { current_user: user_from_session(session) }
context[:subscriber] = GraphQL::Streaming::ActionCableSubscriber.new(self, query_id) do
# This block is called when a matching trigger occurs
context[:current_user].reload
Schema.execute(query_string, variables: variables, context: context)
end
Schema.execute(query_string, variables: variables, context: context)
# If there are no outstanding subscriptions,
# tell the client to stop listening for patches
if !context[:subscriber].subscribed?
context[:collector].close
end
Subscription Type
SubscriptionType is a plain GraphQL::ObjectType, but its fields are special. They correspond to application triggers. When a trigger is fired, any subscriber who is listening to the corresponding field will re-evaluate its query.
Define subscription fields with subscription:
SubscriptionType = GraphQL::ObjectType.define do
name "Subscription"
subscription :post, PostType do
argument :id, !types.Int
resolve -> (obj, args, ctx) {
Post.find(args[:id])
}
end
end
MySchema = GraphQL::Schema.new(subscription: SubscriptionType ...)
Triggers
From your application, you can trigger events on subscription fields. For example, to tell clients that a Post with a given ID changed:
class Post
def after_commit
GraphQL::Streaming::ActionCableSubscriber.trigger(:post, {id: id})
end
end
The arguments passed to .trigger will be tested against field arguments. Any subscribers who requested a matching query will be updated. For example:
subscription {
post(id: 1) {
... postFields
}
}
would be updated by
GraphQL::Streaming::ActionCableSubscriber.trigger(:post, {id: 1})
Development
bundle exec rake testto run the tests- Sadly, no JS tests! See the demo repo for poke-around testing
TODO
- What happens to subscriptions when you redeploy or ActionCable loses its connection? Need to handle reconnecting in some way.
- Handle errors in subscriber block
- Improve middleware so you don't have to manually close
ActionCableCollectors - Tests for JS?
- Other platforms (Pusher, HTTP/2)?
- Public alternative to
@channel.send(:transmit, payload)?
License
The gem is available as open source under the terms of the MIT License.