diff --git a/Guardfile b/Guardfile index 0145708f5cb..52493fa8fa0 100644 --- a/Guardfile +++ b/Guardfile @@ -20,17 +20,20 @@ guard :minitest do to_run << matching_spec end - # Find a `# test_via:` macro to automatically run another test - body = File.read(m[0]) - test_via_match = body.match(/test_via: (.*)/) - if test_via_match - test_via_path = test_via_match[1] - companion_file = Pathname.new(m[0] + "/../" + test_via_path) - .cleanpath - .to_s - .sub(/.rb/, "_spec.rb") - .sub("lib/", "spec/") - to_run << companion_file + # If the file was deleted, it won't exist anymore + if File.exist?(m[0]) + # Find a `# test_via:` macro to automatically run another test + body = File.read(m[0]) + test_via_match = body.match(/test_via: (.*)/) + if test_via_match + test_via_path = test_via_match[1] + companion_file = Pathname.new(m[0] + "/../" + test_via_path) + .cleanpath + .to_s + .sub(/.rb/, "_spec.rb") + .sub("lib/", "spec/") + to_run << companion_file + end end # 0+ files diff --git a/guides/guides.html b/guides/guides.html index 2dea73dc1c6..5d4e57d2544 100644 --- a/guides/guides.html +++ b/guides/guides.html @@ -6,6 +6,7 @@ - name: Types - name: Fields - name: Relay + - name: Subscriptions - name: GraphQL Pro - name: GraphQL Pro - OperationStore - name: Other diff --git a/guides/subscriptions/action_cable_implementation.md b/guides/subscriptions/action_cable_implementation.md new file mode 100644 index 00000000000..e4830843991 --- /dev/null +++ b/guides/subscriptions/action_cable_implementation.md @@ -0,0 +1,15 @@ +--- +layout: guide +search: true +section: Subscriptions +title: Action Cable Implementation +desc: GraphQL subscriptions over ActionCable +index: 4 +experimental: true +--- + +[ActionCable](http://guides.rubyonrails.org/action_cable_overview.html) is a great platform for delivering GraphQL subscriptions on Rails 5+. It handles message passing (via `broadcast`) and transport (via `transmit` over a websocket). + +To get started, see examples in the API docs: {{ "GraphQL::Subscriptions::ActionCableSubscriptions" | api_doc }}. + +A client is available [in graphql-ruby-client](https://github.com/rmosolgo/graphql-ruby-client). diff --git a/guides/subscriptions/implementation.md b/guides/subscriptions/implementation.md new file mode 100644 index 00000000000..751e695c7e5 --- /dev/null +++ b/guides/subscriptions/implementation.md @@ -0,0 +1,24 @@ +--- +layout: guide +search: true +section: Subscriptions +title: Implementation +desc: Subscription execution and delivery +index: 3 +experimental: true +--- + +The {{ "GraphQL::Subscriptions" | api_doc }} plugin is a base class for implementing subscriptions. + +Each method corresponds to a step in the subscription lifecycle. See the API docs for method-by-method documentation: {{ "GraphQL::Subscriptions" | api_doc }}. + +Also, see the {% internal_link "ActionCable implementation guide", "subscriptions/action_cable_implementation" %} or {{ "GraphQL::Subscriptions::ActionCableSubscriptions" | api_doc }} docs for an example implementation. + +## Considerations + +Every Ruby application is different, so consider these points when implementing subscriptions: + +- Is your application single-process or multiprocess? Single-process applications can store state in memory while multiprocess applications need a message broker to keep all processes up-to-date. +- What components of your application can be used for persistence and message passing? +- How will you deliver push updates to subscribed clients? (For example, websockets, ActionCable, Pusher, webhooks, or something else?) +- How will you handle [thundering herd](https://en.wikipedia.org/wiki/Thundering_herd_problem)s? When an event is triggered, how will you manage database access to update clients without swamping your system? diff --git a/guides/subscriptions/overview.md b/guides/subscriptions/overview.md new file mode 100644 index 00000000000..0c65a353cd8 --- /dev/null +++ b/guides/subscriptions/overview.md @@ -0,0 +1,37 @@ +--- +layout: guide +search: true +section: Subscriptions +title: Overview +desc: Introduction to Subscriptions in GraphQL-Ruby +index: 0 +experimental: true +--- + +_Subscriptions_ allow GraphQL clients to observe specific events and receive updates from the server when those events occur. This supports live updates, such as websocket pushes. Subscriptions introduce several new concepts: + +- The __Subscription type__ is the entry point for subscription queries +- __Triggers__ begin the update process +- The __Implementation__ provides application-specific methods for executing & delivering updates. + +### Subscription Type + +`subscription` is an entry point to your GraphQL schema, like `query` or `mutation`. It is defined by your `SubscriptionType`, a root-level `ObjectType`. + +Read more in the {% internal_link "Subscription Type guide", "subscriptions/subscription_type" %}. + +### Triggers + +After an event occurs in our application, _triggers_ begin the update process by sending a name and payload to GraphQL. + +Read more in the {% internal_link "Triggers guide","subscriptions/triggers" %}. + +### Implementation + +Besides the GraphQL component, your application must provide some subscription-related plumbing, for example: + +- __state management__: How does your application keep track of who is subscribed to what? +- __transport__: How does your application deliver payloads to clients? +- __queueing__: How does your application distribute the work of re-running subscription queries? + +Read more in the {% internal_link "Implementation guide", "subscriptions/implementation" %} or check out the {% internal_link "ActionCable implementation", "subscriptions/action_cable_implementation" %}. diff --git a/guides/subscriptions/subscription_type.md b/guides/subscriptions/subscription_type.md new file mode 100644 index 00000000000..42794effd8f --- /dev/null +++ b/guides/subscriptions/subscription_type.md @@ -0,0 +1,61 @@ +--- +layout: guide +search: true +section: Subscriptions +title: Subscription Type +desc: The root type for subscriptions +index: 1 +experimental: true +--- + +`Subscription` is the entry point for all subscriptions in a GraphQL system. Each field corresponds to an event which may be subscribed to: + +```graphql +type Subscription { + # Triggered whenever a post is added + postWasPublished: Post + # Triggered whenever a comment is added; + # to watch a certain post, provide a `postId` + commentWasPublished(postId: ID): Comment +} +``` + +This type is the root for `subscription` operations, for example: + +```graphql +subscription { + postWasPublished { + # This data will be delivered whenever `postWasPublished` + # is triggered by the server: + title + author { + name + } + } +} +``` + +To add subscriptions to your system, define an `ObjectType` named `Subscription`: + +```ruby +# app/graphql/types/subscription_type.rb +Types::SubscriptionType = GraphQL::ObjectType.define do + name "Subscription" + field :postWasPublished, !Types::PostType, "A post was published to the blog" + # ... +end +``` + +Then, add it as the subscription root with `subscription(...)`: + +```ruby +# app/graphql/my_schema.rb +MySchema = GraphQL::Schema.define do + query(Types::QueryType) + # ... + # Add Subscription to + subscription(Types::SubscriptionType) +end +``` + +See {% internal_link "Implementing Subscriptions","subscriptions/implementation" %} for more about actually delivering updates. diff --git a/guides/subscriptions/triggers.md b/guides/subscriptions/triggers.md new file mode 100644 index 00000000000..ed2b828d6c1 --- /dev/null +++ b/guides/subscriptions/triggers.md @@ -0,0 +1,25 @@ +--- +layout: guide +search: true +section: Subscriptions +title: Triggers +desc: Sending updates from your application to GraphQL +index: 2 +experimental: true +--- + +From your application, you can push updates to GraphQL clients with `.trigger`. + +Events are triggered _by name_, and the name must match fields on your {% internal_link "Subscription Type","subscriptions/subscription_type" %} + +```ruby +# Update the system with the new blog post: +MySchema.subscriptions.trigger("postAdded", {}, new_post) +``` + +The arguments are: + +- `name`, which corresponds to the field on subscription type +- `arguments`, which corresponds to the arguments on subscription type (for example, if you subscribe to comments on a certain post, the arguments would be `{postId: comment.post_id}`.) +- `object`, which will be the root object of the subscription update +- `scope:` (not shown) for implicitly scoping the clients who will receive updates. diff --git a/lib/graphql.rb b/lib/graphql.rb index c1645012a3c..948d9de123f 100644 --- a/lib/graphql.rb +++ b/lib/graphql.rb @@ -109,5 +109,6 @@ def self.scan_with_ragel(graphql_string) require "graphql/compatibility" require "graphql/function" require "graphql/filter" +require "graphql/subscriptions" require "graphql/parse_error" require "graphql/tracing" diff --git a/lib/graphql/execution/execute.rb b/lib/graphql/execution/execute.rb index 089155ac50b..3c942ac45b1 100644 --- a/lib/graphql/execution/execute.rb +++ b/lib/graphql/execution/execute.rb @@ -21,7 +21,7 @@ class PropagateNull def execute(ast_operation, root_type, query) result = resolve_root_selection(query) lazy_resolve_root_selection(result, {query: query}) - GraphQL::Execution::Flatten.call(result) + GraphQL::Execution::Flatten.call(query.context) end # @api private @@ -178,7 +178,7 @@ def resolve_value(value, field_type, field_ctx) nil end elsif value.is_a?(Skip) - value + field_ctx.value = value else case field_type.kind when GraphQL::TypeKinds::SCALAR, GraphQL::TypeKinds::ENUM diff --git a/lib/graphql/execution/flatten.rb b/lib/graphql/execution/flatten.rb index 313f26184bd..f7f75fbfb2a 100644 --- a/lib/graphql/execution/flatten.rb +++ b/lib/graphql/execution/flatten.rb @@ -25,6 +25,8 @@ def flatten(obj) when Query::Context::SharedMethods if obj.invalid_null? nil + elsif obj.skipped? && obj.value.empty? + nil else flatten(obj.value) end diff --git a/lib/graphql/execution/multiplex.rb b/lib/graphql/execution/multiplex.rb index 1570d9b0de0..5c5bd637057 100644 --- a/lib/graphql/execution/multiplex.rb +++ b/lib/graphql/execution/multiplex.rb @@ -109,12 +109,12 @@ def finish_query(data_result, query) if !query.valid? { "errors" => query.static_errors.map(&:to_h) } else - {} + data_result end else # Use `context.value` which was assigned during execution result = { - "data" => Execution::Flatten.call(query.context.value) + "data" => Execution::Flatten.call(query.context) } if query.context.errors.any? diff --git a/lib/graphql/field.rb b/lib/graphql/field.rb index 122bd0aa590..e5e064a8cbb 100644 --- a/lib/graphql/field.rb +++ b/lib/graphql/field.rb @@ -129,6 +129,7 @@ class Field :edge_class, :relay_node_field, :relay_nodes_field, + :subscription_scope, argument: GraphQL::Define::AssignArgument ensure_defined( @@ -136,7 +137,7 @@ class Field :mutation, :arguments, :complexity, :function, :resolve, :resolve=, :lazy_resolve, :lazy_resolve=, :lazy_resolve_proc, :resolve_proc, :type, :type=, :name=, :property=, :hash_key=, - :relay_node_field, :relay_nodes_field, :edges?, :edge_class + :relay_node_field, :relay_nodes_field, :edges?, :edge_class, :subscription_scope ) # @return [Boolean] True if this is the Relay find-by-id field @@ -180,6 +181,9 @@ class Field attr_writer :connection + # @return [nil, String] Prefix for subscription names from this field + attr_accessor :subscription_scope + # @return [Boolean] def connection? @connection diff --git a/lib/graphql/internal_representation/node.rb b/lib/graphql/internal_representation/node.rb index e0d4479f51a..889c26d20f4 100644 --- a/lib/graphql/internal_representation/node.rb +++ b/lib/graphql/internal_representation/node.rb @@ -145,6 +145,22 @@ def deep_merge_node(new_parent, scope: nil, merge_self: true) # @return [GraphQL::Query] attr_reader :query + def subscription_topic + @subscription_topic ||= begin + scope = if definition.subscription_scope + @query.context[definition.subscription_scope] + else + nil + end + Subscriptions::Event.serialize( + definition_name, + @query.arguments_for(self, definition), + definition, + scope: scope + ) + end + end + protected attr_writer :owner_type, :parent diff --git a/lib/graphql/query.rb b/lib/graphql/query.rb index f13ba6b7eb7..26df1ac1d35 100644 --- a/lib/graphql/query.rb +++ b/lib/graphql/query.rb @@ -49,6 +49,12 @@ def selected_operation_name selected_operation.name end + # @return [String, nil] the triggered event, if this query is a subscription update + attr_reader :subscription_topic + + # @return [String, nil] + attr_reader :operation_name + # Prepare query `query_string` on `schema` # @param schema [GraphQL::Schema] # @param query_string [String] @@ -60,10 +66,11 @@ def selected_operation_name # @param max_complexity [Numeric] the maximum field complexity for this query (falls back to schema-level value) # @param except [<#call(schema_member, context)>] If provided, objects will be hidden from the schema when `.call(schema_member, context)` returns truthy # @param only [<#call(schema_member, context)>] If provided, objects will be hidden from the schema when `.call(schema_member, context)` returns false - def initialize(schema, query_string = nil, query: nil, document: nil, context: nil, variables: {}, validate: true, operation_name: nil, root_value: nil, max_depth: nil, max_complexity: nil, except: nil, only: nil) + def initialize(schema, query_string = nil, query: nil, document: nil, context: nil, variables: {}, validate: true, subscription_topic: nil, operation_name: nil, root_value: nil, max_depth: nil, max_complexity: nil, except: nil, only: nil) @schema = schema @filter = schema.default_filter.merge(except: except, only: only) @context = Context.new(query: self, object: root_value, values: context) + @subscription_topic = subscription_topic @root_value = root_value @fragments = nil @operations = nil @@ -95,7 +102,6 @@ def initialize(schema, query_string = nil, query: nil, document: nil, context: n @mutation = false @operation_name = operation_name @prepared_ast = false - @validation_pipeline = nil @max_depth = max_depth || schema.max_depth @max_complexity = max_complexity || schema.max_complexity @@ -104,6 +110,10 @@ def initialize(schema, query_string = nil, query: nil, document: nil, context: n @executed = false end + def subscription_update? + @subscription_topic && subscription? + end + # @api private def result_values=(result_hash) if @executed @@ -229,6 +239,10 @@ def merge_filters(only: nil, except: nil) nil end + def subscription? + with_prepared_ast { @subscription } + end + private def find_operation(operations, operation_name) @@ -278,6 +292,7 @@ def prepare_ast # with no operations returns an empty hash @ast_variables = [] @mutation = false + @subscription = false operation_name_error = nil if @operations.any? @selected_operation = find_operation(@operations, @operation_name) @@ -290,6 +305,7 @@ def prepare_ast @ast_variables = @selected_operation.variables @mutation = @selected_operation.operation_type == "mutation" @query = @selected_operation.operation_type == "query" + @subscription = @selected_operation.operation_type == "subscription" end end diff --git a/lib/graphql/query/context.rb b/lib/graphql/query/context.rb index b5d70a3fd71..4a23935dd93 100644 --- a/lib/graphql/query/context.rb +++ b/lib/graphql/query/context.rb @@ -13,6 +13,13 @@ module SharedMethods # @return [Hash, Array, String, Integer, Float, Boolean, nil] The resolved value for this field attr_reader :value + # @return [Boolean] were any fields of this selection skipped? + attr_reader :skipped + alias :skipped? :skipped + + # @api private + attr_writer :skipped + # Return this value to tell the runtime # to exclude this field from the response altogether def skip @@ -59,6 +66,7 @@ def add_error(error) include SharedMethods extend GraphQL::Delegate + attr_reader :execution_strategy # `strategy` is required by GraphQL::Batch alias_method :strategy, :execution_strategy @@ -120,6 +128,7 @@ def initialize(query:, values: , object:) # @!method []=(key, value) # Reassign `key` to the hash passed to {Schema#execute} as `context:` + # @return [GraphQL::Schema::Warden] def warden @warden ||= @query.warden @@ -206,6 +215,7 @@ def value=(new_value) @parent.received_null_child end when GraphQL::Execution::Execute::SKIP + @parent.skipped = true @parent.delete(self) else @value = new_value diff --git a/lib/graphql/query/literal_input.rb b/lib/graphql/query/literal_input.rb index 3beff11afbe..a52d94c3b0e 100644 --- a/lib/graphql/query/literal_input.rb +++ b/lib/graphql/query/literal_input.rb @@ -14,9 +14,22 @@ def self.coerce(type, ast_node, variables) else case type when GraphQL::ScalarType - type.coerce_input(ast_node, variables.context) + # TODO smell + # This gets used for plain values during subscriber.trigger + if variables + type.coerce_input(ast_node, variables.context) + else + type.coerce_isolated_input(ast_node) + end when GraphQL::EnumType - type.coerce_input(ast_node.name, variables.context) + # TODO smell + # This gets used for plain values sometimes + v = ast_node.is_a?(GraphQL::Language::Nodes::Enum) ? ast_node.name : ast_node + if variables + type.coerce_input(v, variables.context) + else + type.coerce_isolated_input(v) + end when GraphQL::NonNullType LiteralInput.coerce(type.of_type, ast_node, variables) when GraphQL::ListType @@ -26,7 +39,9 @@ def self.coerce(type, ast_node, variables) [LiteralInput.coerce(type.of_type, ast_node, variables)] end when GraphQL::InputObjectType - from_arguments(ast_node.arguments, type.arguments, variables) + # TODO smell: handling AST vs handling plain Ruby + next_args = ast_node.is_a?(Hash) ? ast_node : ast_node.arguments + from_arguments(next_args, type.arguments, variables) end end end @@ -43,7 +58,14 @@ def self.from_arguments(ast_arguments, argument_defns, variables) # Variables is nil when making .defaults_for context = variables ? variables.context : nil values_hash = {} - indexed_arguments = ast_arguments.each_with_object({}) { |a, memo| memo[a.name] = a } + indexed_arguments = case ast_arguments + when Hash + ast_arguments + when Array + ast_arguments.each_with_object({}) { |a, memo| memo[a.name] = a } + else + raise ArgumentError, "Unexpected ast_arguments: #{ast_arguments}" + end argument_defns.each do |arg_name, arg_defn| ast_arg = indexed_arguments[arg_name] @@ -51,12 +73,17 @@ def self.from_arguments(ast_arguments, argument_defns, variables) # If the value is a variable, # only add a value if the variable is actually present. # Otherwise, coerce the value in the AST, prepare the value and add it. - if ast_arg - value_is_a_variable = ast_arg.value.is_a?(GraphQL::Language::Nodes::VariableIdentifier) + # + # TODO: since indexed_arguments can come from a plain Ruby hash, + # have to check for `false` or `nil` as hash values. This is getting smelly :S + if indexed_arguments.key?(arg_name) + arg_value = ast_arg.is_a?(GraphQL::Language::Nodes::Argument) ? ast_arg.value : ast_arg + + value_is_a_variable = arg_value.is_a?(GraphQL::Language::Nodes::VariableIdentifier) - if (!value_is_a_variable || (value_is_a_variable && variables.key?(ast_arg.value.name))) + if (!value_is_a_variable || (value_is_a_variable && variables.key?(arg_value.name))) - value = coerce(arg_defn.type, ast_arg.value, variables) + value = coerce(arg_defn.type, arg_value, variables) value = arg_defn.prepare(value, context) if value.is_a?(GraphQL::ExecutionError) @@ -64,7 +91,7 @@ def self.from_arguments(ast_arguments, argument_defns, variables) raise value end - values_hash[ast_arg.name] = value + values_hash[arg_name] = value end end diff --git a/lib/graphql/query/result.rb b/lib/graphql/query/result.rb index 083c049f919..d26f96c8e19 100644 --- a/lib/graphql/query/result.rb +++ b/lib/graphql/query/result.rb @@ -19,7 +19,7 @@ def initialize(query:, values:) # @return [Hash] The resulting hash of "data" and/or "errors" attr_reader :to_h - def_delegators :@query, :context, :mutation?, :query? + def_delegators :@query, :context, :mutation?, :query?, :subscription? def_delegators :@to_h, :[], :keys, :values, :to_json, :as_json diff --git a/lib/graphql/schema.rb b/lib/graphql/schema.rb index d61347bab58..25576057a8f 100644 --- a/lib/graphql/schema.rb +++ b/lib/graphql/schema.rb @@ -81,6 +81,10 @@ class Schema :cursor_encoder, :raise_definition_error + # Single, long-lived instance of the provided subscriptions class, if there is one. + # @return [GraphQL::Subscriptions] + attr_accessor :subscriptions + # @return [MiddlewareChain] MiddlewareChain which is applied to fields during execution attr_accessor :middleware diff --git a/lib/graphql/subscriptions.rb b/lib/graphql/subscriptions.rb new file mode 100644 index 00000000000..a0b5a76bcc2 --- /dev/null +++ b/lib/graphql/subscriptions.rb @@ -0,0 +1,129 @@ +# frozen_string_literal: true +require "graphql/subscriptions/event" +require "graphql/subscriptions/instrumentation" +if defined?(ActionCable) + require "graphql/subscriptions/action_cable_subscriptions" +end + +module GraphQL + class Subscriptions + def self.use(defn, options = {}) + schema = defn.target + options[:schema] = schema + schema.subscriptions = self.new(options) + instrumentation = Subscriptions::Instrumentation.new(schema: schema) + defn.instrument(:field, instrumentation) + defn.instrument(:query, instrumentation) + nil + end + + def initialize(kwargs) + @schema = kwargs[:schema] + end + + # Fetch subscriptions matching this field + arguments pair + # And pass them off to the queue. + # @param event_name [String] + # @param args [Hash] + # @param object [Object] + # @param scope [Symbol, String] + # @return [void] + def trigger(event_name, args, object, scope: nil) + field = @schema.get_field("Subscription", event_name) + if !field + raise "No subscription matching trigger: #{event_name}" + end + + event = Subscriptions::Event.new( + name: event_name, + arguments: args, + field: field, + scope: scope, + ) + execute_all(event, object) + end + + def initialize(schema:, **rest) + @schema = schema + end + + # `event` was triggered on `object`, and `subscription_id` was subscribed, + # so it should be updated. + # + # Load `subscription_id`'s GraphQL data, re-evaluate the query, and deliver the result. + # + # This is where a queue may be inserted to push updates in the background. + # + # @param subscription_id [String] + # @param event [GraphQL::Subscriptions::Event] The event which was triggered + # @param object [Object] The value for the subscription field + # @return [void] + def execute(subscription_id, event, object) + # Lookup the saved data for this subscription + query_data = read_subscription(subscription_id) + # Fetch the required keys from the saved data + query_string = query_data.fetch(:query_string) + variables = query_data.fetch(:variables) + context = query_data.fetch(:context) + operation_name = query_data.fetch(:operation_name) + # Re-evaluate the saved query + result = @schema.execute( + { + query: query_string, + context: context, + subscription_topic: event.topic, + operation_name: operation_name, + variables: variables, + root_value: object, + } + ) + deliver(subscription_id, result) + end + + # Event `event` occurred on `object`, + # Update all subscribers. + # @param event [Subscriptions::Event] + # @param object [Object] + # @return [void] + def execute_all(event, object) + each_subscription_id(event) do |subscription_id| + execute(subscription_id, event, object) + end + end + + # Get each `subscription_id` subscribed to `event.topic` and yield them + # @param event [GraphQL::Subscriptions::Event] + # @yieldparam subscription_id [String] + # @return [void] + def each_subscription_id(event) + raise NotImplementedError + end + + # The system wants to send an update to this subscription. + # Read its data and return it. + # @param subscription_id [String] + # @return [Hash] Containing required keys + def read_subscription(subscription_id) + raise NotImplementedError + end + + # A subscription query was re-evaluated, returning `result`. + # The result should be send to `subscription_id`. + # @param subscription_id [String] + # @param result [Hash] + # @param context [GraphQL::Query::Context] + # @return [void] + def deliver(subscription_id, result, context) + raise NotImplementedError + end + + # `query` was executed and found subscriptions to `events`. + # Update the database to reflect this new state. + # @param query [GraphQL::Query] + # @param events [Array] + # @return [void] + def write_subscription(query, events) + raise NotImplementedError + end + end +end diff --git a/lib/graphql/subscriptions/action_cable_subscriptions.rb b/lib/graphql/subscriptions/action_cable_subscriptions.rb new file mode 100644 index 00000000000..7343121c70d --- /dev/null +++ b/lib/graphql/subscriptions/action_cable_subscriptions.rb @@ -0,0 +1,122 @@ +# frozen_string_literal: true +require "securerandom" + +module GraphQL + class Subscriptions + # A subscriptions implementation that sends data + # as ActionCable broadcastings. + # + # Experimental, some things to keep in mind: + # + # - No queueing system; ActiveJob should be added + # - Take care to reload context when re-delivering the subscription. (see {Query#subscription_update?}) + # + # @example Adding ActionCableSubscriptions to your schema + # MySchema = GraphQL::Schema.define do + # # ... + # use GraphQL::Subscriptions::ActionCableSubscriptions + # end + # + # @example Implementing a channel for GraphQL Subscriptions + # class GraphqlChannel < ApplicationCable::Channel + # def subscribed + # @subscription_ids = [] + # end + # + # def execute(data) + # query = data["query"] + # variables = ensure_hash(data["variables"]) + # operation_name = data["operationName"] + # context = { + # current_user: current_user, + # # Make sure the channel is in the context + # channel: self, + # } + # + # result = MySchema.execute({ + # query: query, + # context: context, + # variables: variables, + # operation_name: operation_name + # }) + # + # payload = { + # result: result.to_h, + # more: result.subscription?, + # } + # + # # Track the subscription here so we can remove it + # # on unsubscribe. + # if result.context[:subscription_id] + # @subscription_ids << context[:subscription_id] + # end + # + # transmit(payload) + # end + # + # def unsubscribed + # @subscription_ids.each { |sid| + # CardsSchema.subscriptions.delete_subscription(sid) + # } + # end + # end + # + class ActionCableSubscriptions < GraphQL::Subscriptions + SUBSCRIPTION_PREFIX = "graphql-subscription:" + EVENT_PREFIX = "graphql-event:" + def initialize(**rest) + # A per-process map of subscriptions to deliver. + # This is provided by Rails, so let's use it + @subscriptions = Concurrent::Map.new + super + end + + # An event was triggered; Push the data over ActionCable. + # Subscribers will re-evaluate locally. + # TODO: this method name is a smell + def execute_all(event, object) + ActionCable.server.broadcast(EVENT_PREFIX + event.topic, object) + end + + # This subscription was re-evaluated. + # Send it to the specific stream where this client was waiting. + def deliver(subscription_id, result) + payload = { result: result.to_h, more: true } + ActionCable.server.broadcast(SUBSCRIPTION_PREFIX + subscription_id, payload) + end + + # A query was run where these events were subscribed to. + # Store them in memory in _this_ ActionCable frontend. + # It will receive notifications when events come in + # and re-evaluate the query locally. + def write_subscription(query, events) + channel = query.context[:channel] + subscription_id = query.context[:subscription_id] ||= SecureRandom.uuid + channel.stream_from(SUBSCRIPTION_PREFIX + subscription_id) + @subscriptions[subscription_id] = query + events.each do |event| + channel.stream_from(EVENT_PREFIX + event.topic, coder: ActiveSupport::JSON) do |message| + execute(subscription_id, event, message) + nil + end + end + end + + # Return the query from "storage" (in memory) + def read_subscription(subscription_id) + query = @subscriptions[subscription_id] + { + query_string: query.query_string, + variables: query.provided_variables, + context: query.context.to_h, + operation_name: query.operation_name, + } + end + + # The channel was closed, forget about it. + def delete_subscription(subscription_id) + @subscriptions.delete(subscription_id) + end + end + end +end diff --git a/lib/graphql/subscriptions/event.rb b/lib/graphql/subscriptions/event.rb new file mode 100644 index 00000000000..c51b7220fee --- /dev/null +++ b/lib/graphql/subscriptions/event.rb @@ -0,0 +1,52 @@ +# frozen_string_literal: true +module GraphQL + class Subscriptions + # This thing can be: + # - Subscribed to by `subscription { ... }` + # - Triggered by `MySchema.subscriber.trigger(name, arguments, obj)` + # + # An array of `Event`s are passed to `store.register(query, events)`. + class Event + # @return [String] Corresponds to the Subscription root field name + attr_reader :name + + # @return [GraphQL::Query::Arguments] + attr_reader :arguments + + # @return [GraphQL::Query::Context] + attr_reader :context + + # @return [String] An opaque string which identifies this event, derived from `name` and `arguments` + attr_reader :topic + + def initialize(name:, arguments:, field: nil, context: nil, scope: nil) + @name = name + @arguments = arguments + @context = context + field ||= context.field + scope_val = scope || (context && field.subscription_scope && context[field.subscription_scope]) + + @topic = self.class.serialize(name, arguments, field, scope: scope_val) + end + + # @return [String] an identifier for this unit of subscription + def self.serialize(name, arguments, field, scope:) + normalized_args = case arguments + when GraphQL::Query::Arguments + arguments + when Hash + GraphQL::Query::LiteralInput.from_arguments( + arguments, + field.arguments, + nil, + ) + else + raise ArgumentError, "Unexpected arguments: #{arguments}, must be Hash or GraphQL::Arguments" + end + + sorted_h = normalized_args.to_h.sort.to_h + JSON.dump([scope, name, sorted_h]) + end + end + end +end diff --git a/lib/graphql/subscriptions/instrumentation.rb b/lib/graphql/subscriptions/instrumentation.rb new file mode 100644 index 00000000000..4d0ae85548f --- /dev/null +++ b/lib/graphql/subscriptions/instrumentation.rb @@ -0,0 +1,68 @@ +# frozen_string_literal: true +# test_via: ../subscriptions.rb +module GraphQL + class Subscriptions + # Wrap the root fields of the subscription type with special logic for: + # - Registering the subscription during the first execution + # - Evaluating the triggered portion(s) of the subscription during later execution + class Instrumentation + def initialize(schema:) + @schema = schema + end + + def instrument(type, field) + if type == @schema.subscription + # This is a root field of `subscription` + subscribing_resolve_proc = SubscriptionRegistrationResolve.new(field.resolve_proc) + field.redefine(resolve: subscribing_resolve_proc) + else + field + end + end + + # If needed, prepare to gather events which this query subscribes to + def before_query(query) + if query.subscription? && !query.subscription_update? + query.context.namespace(:subscriptions)[:events] = [] + end + end + + # After checking the root fields, pass the gathered events to the store + def after_query(query) + events = query.context.namespace(:subscriptions)[:events] + if events && events.any? + @schema.subscriptions.write_subscription(query, events) + end + end + + private + + class SubscriptionRegistrationResolve + def initialize(inner_proc) + @inner_proc = inner_proc + end + + # Wrap the proc with subscription registration logic + def call(obj, args, ctx) + events = ctx.namespace(:subscriptions)[:events] + if events + # This is the first execution, so gather an Event + # for the backend to register: + events << Subscriptions::Event.new( + name: ctx.field.name, + arguments: args, + context: ctx, + ) + ctx.skip + elsif ctx.irep_node.subscription_topic == ctx.query.subscription_topic + # The root object is _already_ the subscription update: + @inner_proc.call(obj, args, ctx) + else + # This is a subscription update, but this event wasn't triggered. + ctx.skip + end + end + end + end + end +end diff --git a/spec/graphql/subscriptions_spec.rb b/spec/graphql/subscriptions_spec.rb new file mode 100644 index 00000000000..3bf867ad508 --- /dev/null +++ b/spec/graphql/subscriptions_spec.rb @@ -0,0 +1,331 @@ +# frozen_string_literal: true +require "spec_helper" + +class InMemoryBackend + class Subscriptions < GraphQL::Subscriptions + attr_reader :deliveries, :pushes, :extra + + def initialize(schema:, extra:) + super + @extra = extra + @queries = {} + @subscriptions = Hash.new { |h, k| h[k] = [] } + @deliveries = Hash.new { |h, k| h[k] = [] } + @pushes = [] + end + + def write_subscription(query, events) + @queries[query.context[:socket]] = query + events.each do |ev| + # The `context` is functioning as subscription data. + # IRL you'd have some other model that persisted the subscription + @subscriptions[ev.topic] << ev.context + end + end + + def each_subscription_id(event) + @subscriptions[event.topic].each do |ctx| + yield(ctx[:socket]) + end + end + + def read_subscription(channel) + query = @queries[channel] + { + query_string: query.query_string, + operation_name: query.operation_name, + variables: query.provided_variables, + context: { me: query.context[:me] }, + transport: :socket, + } + end + + def delete_subscription(channel) + query = @queries.delete(channel) + if query + @subscriptions.each do |key, contexts| + contexts.delete(query.context) + end + end + end + + def deliver(channel, result) + @deliveries[channel] << result + end + + def execute(channel, event, object) + @pushes << channel + super + end + + # Just for testing: + def reset + @queries.clear + @subscriptions.clear + @deliveries.clear + @pushes.clear + end + + def size + @subscriptions.size + end + + def subscriptions + @subscriptions + end + end + # Just a random stateful object for tracking what happens: + class Payload + attr_reader :str + + def initialize + @str = "Update" + @counter = 0 + end + + def int + @counter += 1 + end + end + + SchemaDefinition = <<-GRAPHQL + type Subscription { + payload(id: ID!): Payload! + event(stream: StreamInput): Payload + myEvent(type: PayloadType): Payload + } + + type Payload { + str: String! + int: Int! + } + + input StreamInput { + userId: ID! + type: PayloadType = ONE + } + + # Arbitrary "kinds" of payloads which may be + # subscribed to separately + enum PayloadType { + ONE + TWO + } + + type Query { + dummy: Int + } + GRAPHQL + + Resolvers = { + "Subscription" => { + "payload" => ->(o,a,c) { o }, + "myEvent" => ->(o,a,c) { o }, + "event" => ->(o,a,c) { o }, + }, + } + Schema = GraphQL::Schema.from_definition(SchemaDefinition, default_resolve: Resolvers).redefine do + use InMemoryBackend::Subscriptions, + extra: 123 + end + + # TODO don't hack this (no way to add metadata from IDL parser right now) + Schema.get_field("Subscription", "myEvent").subscription_scope = :me +end + +describe GraphQL::Subscriptions do + before do + schema.subscriptions.reset + end + + let(:root_object) { + OpenStruct.new( + payload: InMemoryBackend::Payload.new, + ) + } + + let(:schema) { InMemoryBackend::Schema } + let(:implementation) { schema.subscriptions } + let(:deliveries) { implementation.deliveries } + describe "pushing updates" do + it "sends updated data" do + query_str = <<-GRAPHQL + subscription ($id: ID!){ + firstPayload: payload(id: $id) { str, int } + otherPayload: payload(id: "900") { int } + } + GRAPHQL + + # Initial subscriptions + res_1 = schema.execute(query_str, context: { socket: "1" }, variables: { "id" => "100" }, root_value: root_object) + res_2 = schema.execute(query_str, context: { socket: "2" }, variables: { "id" => "200" }, root_value: root_object) + + # Initial response is nil, no broadcasts yet + assert_equal(nil, res_1["data"]) + assert_equal(nil, res_2["data"]) + assert_equal [], deliveries["1"] + assert_equal [], deliveries["2"] + + # Application stuff happens. + # The application signals graphql via `subscriptions.trigger`: + schema.subscriptions.trigger("payload", {"id" => "100"}, root_object.payload) + schema.subscriptions.trigger("payload", {"id" => "200"}, root_object.payload) + schema.subscriptions.trigger("payload", {"id" => "100"}, root_object.payload) + schema.subscriptions.trigger("payload", {"id" => "300"}, nil) + + # Let's see what GraphQL sent over the wire: + assert_equal({"str" => "Update", "int" => 1}, deliveries["1"][0]["data"]["firstPayload"]) + assert_equal({"str" => "Update", "int" => 2}, deliveries["2"][0]["data"]["firstPayload"]) + assert_equal({"str" => "Update", "int" => 3}, deliveries["1"][1]["data"]["firstPayload"]) + end + end + + describe "subscribing" do + it "doesn't call the subscriptions for invalid queries" do + query_str = <<-GRAPHQL + subscription ($id: ID){ + payload(id: $id) { str, int } + } + GRAPHQL + + res = schema.execute(query_str, context: { socket: "1" }, variables: { "id" => "100" }, root_value: root_object) + assert_equal true, res.key?("errors") + assert_equal 0, implementation.size + end + end + + describe "trigger" do + it "uses the provided queue" do + query_str = <<-GRAPHQL + subscription ($id: ID!){ + payload(id: $id) { str, int } + } + GRAPHQL + + schema.execute(query_str, context: { socket: "1" }, variables: { "id" => "8" }, root_value: root_object) + schema.subscriptions.trigger("payload", { "id" => "8"}, root_object.payload) + assert_equal ["1"], implementation.pushes + end + + it "pushes errors" do + query_str = <<-GRAPHQL + subscription ($id: ID!){ + payload(id: $id) { str, int } + } + GRAPHQL + + schema.execute(query_str, context: { socket: "1" }, variables: { "id" => "8" }, root_value: root_object) + schema.subscriptions.trigger("payload", { "id" => "8"}, OpenStruct.new(str: nil, int: nil)) + delivery = deliveries["1"].first + assert_equal nil, delivery.fetch("data") + assert_equal 1, delivery["errors"].length + end + + it "coerces args" do + query_str = <<-GRAPHQL + subscription($type: PayloadType) { + e1: event(stream: { userId: "3", type: $type }) { int } + } + GRAPHQL + + # Subscribe with explicit `TYPE` + schema.execute(query_str, context: { socket: "1" }, variables: { "type" => "ONE" }, root_value: root_object) + # Subscribe with default `TYPE` + schema.execute(query_str, context: { socket: "2" }, root_value: root_object) + # Subscribe with non-matching `TYPE` + schema.execute(query_str, context: { socket: "3" }, variables: { "type" => "TWO" }, root_value: root_object) + # Subscribe with explicit null + schema.execute(query_str, context: { socket: "4" }, variables: { "type" => nil }, root_value: root_object) + + # Trigger the subscription with coerceable args, different orders: + schema.subscriptions.trigger("event", { "stream" => {"userId" => 3, "type" => "ONE"} }, OpenStruct.new(str: "", int: 1)) + schema.subscriptions.trigger("event", { "stream" => {"type" => "ONE", "userId" => "3"} }, OpenStruct.new(str: "", int: 2)) + # This is a non-trigger + schema.subscriptions.trigger("event", { "stream" => {"userId" => "3", "type" => "TWO"} }, OpenStruct.new(str: "", int: 3)) + # These get default value of ONE + schema.subscriptions.trigger("event", { "stream" => {"userId" => "3"} }, OpenStruct.new(str: "", int: 4)) + # Trigger with null updates subscriptionss to null + schema.subscriptions.trigger("event", { "stream" => {"userId" => 3, "type" => nil} }, OpenStruct.new(str: "", int: 5)) + + assert_equal [1,2,4], deliveries["1"].map { |d| d["data"]["e1"]["int"] } + + # Same as socket_1 + assert_equal [1,2,4], deliveries["2"].map { |d| d["data"]["e1"]["int"] } + + # Received the "non-trigger" + assert_equal [3], deliveries["3"].map { |d| d["data"]["e1"]["int"] } + + # Received the trigger with null + assert_equal [5], deliveries["4"].map { |d| d["data"]["e1"]["int"] } + end + + it "allows context-scoped subscriptions" do + query_str = <<-GRAPHQL + subscription($type: PayloadType) { + myEvent(type: $type) { int } + } + GRAPHQL + + # Subscriptions for user 1 + schema.execute(query_str, context: { socket: "1", me: "1" }, variables: { "type" => "ONE" }, root_value: root_object) + schema.execute(query_str, context: { socket: "2", me: "1" }, variables: { "type" => "TWO" }, root_value: root_object) + # Subscription for user 2 + schema.execute(query_str, context: { socket: "3", me: "2" }, variables: { "type" => "ONE" }, root_value: root_object) + + schema.subscriptions.trigger("myEvent", { "type" => "ONE" }, OpenStruct.new(str: "", int: 1), scope: "1") + schema.subscriptions.trigger("myEvent", { "type" => "TWO" }, OpenStruct.new(str: "", int: 2), scope: "1") + schema.subscriptions.trigger("myEvent", { "type" => "ONE" }, OpenStruct.new(str: "", int: 3), scope: "2") + + # Delivered to user 1 + assert_equal [1], deliveries["1"].map { |d| d["data"]["myEvent"]["int"] } + assert_equal [2], deliveries["2"].map { |d| d["data"]["myEvent"]["int"] } + # Delivered to user 2 + assert_equal [3], deliveries["3"].map { |d| d["data"]["myEvent"]["int"] } + end + + describe "errors" do + class ErrorPayload + def int + raise "Boom!" + end + + def str + raise GraphQL::ExecutionError.new("This is handled") + end + end + + it "lets unhandled errors crash "do + query_str = <<-GRAPHQL + subscription($type: PayloadType) { + myEvent(type: $type) { int } + } + GRAPHQL + + schema.execute(query_str, context: { socket: "1", me: "1" }, variables: { "type" => "ONE" }, root_value: root_object) + err = assert_raises(RuntimeError) { + schema.subscriptions.trigger("myEvent", { "type" => "ONE" }, ErrorPayload.new, scope: "1") + } + assert_equal "Boom!", err.message + end + end + + it "sends query errors to the subscriptions" do + query_str = <<-GRAPHQL + subscription($type: PayloadType) { + myEvent(type: $type) { str } + } + GRAPHQL + + schema.execute(query_str, context: { socket: "1", me: "1" }, variables: { "type" => "ONE" }, root_value: root_object) + schema.subscriptions.trigger("myEvent", { "type" => "ONE" }, ErrorPayload.new, scope: "1") + res = deliveries["1"].first + assert_equal "This is handled", res["errors"][0]["message"] + end + end + + describe "implementation" do + it "is initialized with keywords" do + assert_equal 123, schema.subscriptions.extra + end + end +end