Skip to content

Commit cf9f195

Browse files
feat!: Removes EventStream, replacing with AsyncThrowingStream
1 parent d112be0 commit cf9f195

File tree

8 files changed

+87
-229
lines changed

8 files changed

+87
-229
lines changed

MIGRATION.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,10 @@ The documentation here will be very helpful in the conversion: https://www.swift
1616

1717
This was changed to `ConcurrentFieldExecutionStrategy`, and takes no parameters.
1818

19+
### EventStream removal
20+
21+
The `EventStream` abstraction used to provide pre-concurrency subscription support has been removed. This means that `graphqlSubscribe(...).stream` will now be an `AsyncThrowingStream<GraphQLResult, Error>` type, instead of an `EventStream` type, and that downcasting to `ConcurrentEventStream` is no longer necessary.
22+
1923
## 2 to 3
2024

2125
### TypeReference removal

README.md

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -75,15 +75,14 @@ let schema = try GraphQLSchema(
7575
return eventResult
7676
},
7777
subscribe: { _, _, _, _, _ in // Defines how to construct the event stream
78-
let asyncStream = AsyncThrowingStream<String, Error> { continuation in
78+
return AsyncThrowingStream<String, Error> { continuation in
7979
let timer = Timer.scheduledTimer(
8080
withTimeInterval: 3,
8181
repeats: true,
8282
) {
8383
continuation.yield("world") // Emits "world" every 3 seconds
8484
}
8585
}
86-
return ConcurrentEventStream<String>(asyncStream)
8786
}
8887
)
8988
]
@@ -97,8 +96,6 @@ To execute a subscription use the `graphqlSubscribe` function:
9796
let subscriptionResult = try await graphqlSubscribe(
9897
schema: schema,
9998
)
100-
// Must downcast from EventStream to concrete type to use in 'for await' loop below
101-
let concurrentStream = subscriptionResult.stream! as! ConcurrentEventStream
10299
for try await result in concurrentStream.stream {
103100
print(result)
104101
}
@@ -110,9 +107,6 @@ The code above will print the following JSON every 3 seconds:
110107
{ "hello": "world" }
111108
```
112109

113-
The example above assumes that your environment has access to Swift Concurrency. If that is not the case, try using
114-
[GraphQLRxSwift](https://github.com/GraphQLSwift/GraphQLRxSwift)
115-
116110
## Encoding Results
117111

118112
If you encode a `GraphQLResult` with an ordinary `JSONEncoder`, there are no guarantees that the field order will match the query,

Sources/GraphQL/GraphQL.swift

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -44,19 +44,15 @@ public struct GraphQLResult: Equatable, Codable, Sendable, CustomStringConvertib
4444

4545
/// SubscriptionResult wraps the observable and error data returned by the subscribe request.
4646
public struct SubscriptionResult {
47-
public let stream: SubscriptionEventStream?
47+
public let stream: AsyncThrowingStream<GraphQLResult, Error>?
4848
public let errors: [GraphQLError]
4949

50-
public init(stream: SubscriptionEventStream? = nil, errors: [GraphQLError] = []) {
50+
public init(stream: AsyncThrowingStream<GraphQLResult, Error>? = nil, errors: [GraphQLError] = []) {
5151
self.stream = stream
5252
self.errors = errors
5353
}
5454
}
5555

56-
/// SubscriptionObservable represents an event stream of fully resolved GraphQL subscription
57-
/// results. Subscribers can be added to this stream.
58-
public typealias SubscriptionEventStream = EventStream<GraphQLResult>
59-
6056
/// This is the primary entry point function for fulfilling GraphQL operations
6157
/// by parsing, validating, and executing a GraphQL document along side a
6258
/// GraphQL schema.

Sources/GraphQL/Subscription/EventStream.swift

Lines changed: 0 additions & 73 deletions
This file was deleted.

Sources/GraphQL/Subscription/Subscribe.swift

Lines changed: 39 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -46,25 +46,41 @@ func subscribe(
4646
)
4747

4848
if let sourceStream = sourceResult.stream {
49-
let subscriptionStream = sourceStream.map { eventPayload -> GraphQLResult in
50-
// For each payload yielded from a subscription, map it over the normal
51-
// GraphQL `execute` function, with `payload` as the rootValue.
52-
// This implements the "MapSourceToResponseEvent" algorithm described in
53-
// the GraphQL specification. The `execute` function provides the
54-
// "ExecuteSubscriptionEvent" algorithm, as it is nearly identical to the
55-
// "ExecuteQuery" algorithm, for which `execute` is also used.
56-
try await execute(
57-
queryStrategy: queryStrategy,
58-
mutationStrategy: mutationStrategy,
59-
subscriptionStrategy: subscriptionStrategy,
60-
instrumentation: instrumentation,
61-
schema: schema,
62-
documentAST: documentAST,
63-
rootValue: eventPayload,
64-
context: context,
65-
variableValues: variableValues,
66-
operationName: operationName
67-
)
49+
// We must create a new AsyncSequence because AsyncSequence.map requires a concrete type (which we cannot know),
50+
// and we need the result to be a concrete type.
51+
let subscriptionStream = AsyncThrowingStream<GraphQLResult, Error> { continuation in
52+
let task = Task {
53+
do {
54+
for try await eventPayload in sourceStream {
55+
// For each payload yielded from a subscription, map it over the normal
56+
// GraphQL `execute` function, with `payload` as the rootValue.
57+
// This implements the "MapSourceToResponseEvent" algorithm described in
58+
// the GraphQL specification. The `execute` function provides the
59+
// "ExecuteSubscriptionEvent" algorithm, as it is nearly identical to the
60+
// "ExecuteQuery" algorithm, for which `execute` is also used.
61+
let newEvent = try await execute(
62+
queryStrategy: queryStrategy,
63+
mutationStrategy: mutationStrategy,
64+
subscriptionStrategy: subscriptionStrategy,
65+
instrumentation: instrumentation,
66+
schema: schema,
67+
documentAST: documentAST,
68+
rootValue: eventPayload,
69+
context: context,
70+
variableValues: variableValues,
71+
operationName: operationName
72+
)
73+
continuation.yield(newEvent)
74+
}
75+
continuation.finish()
76+
} catch {
77+
continuation.finish(throwing: error)
78+
}
79+
}
80+
81+
continuation.onTermination = { @Sendable reason in
82+
task.cancel()
83+
}
6884
}
6985
return SubscriptionResult(stream: subscriptionStream, errors: sourceResult.errors)
7086
} else {
@@ -245,7 +261,7 @@ func executeSubscription(
245261
return SourceEventStreamResult(errors: context.errors)
246262
} else if let error = resolved as? GraphQLError {
247263
return SourceEventStreamResult(errors: [error])
248-
} else if let stream = resolved as? EventStream<Any> {
264+
} else if let stream = resolved as? any AsyncSequence {
249265
return SourceEventStreamResult(stream: stream)
250266
} else if resolved == nil {
251267
return SourceEventStreamResult(errors: [
@@ -255,7 +271,7 @@ func executeSubscription(
255271
let resolvedObj = resolved as AnyObject
256272
return SourceEventStreamResult(errors: [
257273
GraphQLError(
258-
message: "Subscription field resolver must return EventStream<Any>. Received: '\(resolvedObj)'"
274+
message: "Subscription field resolver must return an AsyncSequence. Received: '\(resolvedObj)'"
259275
),
260276
])
261277
}
@@ -266,10 +282,10 @@ func executeSubscription(
266282
// checking. Normal resolvers for subscription fields should handle type casting, same as resolvers
267283
// for query fields.
268284
struct SourceEventStreamResult {
269-
public let stream: EventStream<Any>?
285+
public let stream: (any AsyncSequence)?
270286
public let errors: [GraphQLError]
271287

272-
public init(stream: EventStream<Any>? = nil, errors: [GraphQLError] = []) {
288+
public init(stream: (any AsyncSequence)? = nil, errors: [GraphQLError] = []) {
273289
self.stream = stream
274290
self.errors = errors
275291
}

Tests/GraphQLTests/SubscriptionTests/SimplePubSub.swift

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@ class SimplePubSub<T> {
2020
}
2121
}
2222

23-
func subscribe() -> ConcurrentEventStream<T> {
24-
let asyncStream = AsyncThrowingStream<T, Error> { continuation in
23+
func subscribe() -> AsyncThrowingStream<T, Error> {
24+
return AsyncThrowingStream<T, Error> { continuation in
2525
let subscriber = Subscriber<T>(
2626
callback: { newValue in
2727
continuation.yield(newValue)
@@ -32,7 +32,6 @@ class SimplePubSub<T> {
3232
)
3333
subscribers.append(subscriber)
3434
}
35-
return ConcurrentEventStream<T>(asyncStream)
3635
}
3736
}
3837

Tests/GraphQLTests/SubscriptionTests/SubscriptionSchema.swift

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -129,15 +129,14 @@ class EmailDb {
129129
},
130130
subscribe: { _, args, _, _ throws -> Any? in
131131
let priority = args["priority"].int ?? 0
132-
let filtered = self.publisher.subscribe().stream
133-
.filterStream { emailAny throws in
132+
let filtered = self.publisher.subscribe().filter { emailAny throws in
134133
if let email = emailAny as? Email {
135134
return email.priority >= priority
136135
} else {
137136
return true
138137
}
139138
}
140-
return ConcurrentEventStream<Any>(filtered)
139+
return filtered
141140
}
142141
)
143142
}
@@ -146,7 +145,7 @@ class EmailDb {
146145
func subscription(
147146
query: String,
148147
variableValues: [String: Map] = [:]
149-
) async throws -> SubscriptionEventStream {
148+
) async throws -> AsyncThrowingStream<GraphQLResult, Error> {
150149
return try await createSubscription(
151150
schema: defaultSchema(),
152151
query: query,
@@ -186,7 +185,7 @@ func createSubscription(
186185
schema: GraphQLSchema,
187186
query: String,
188187
variableValues: [String: Map] = [:]
189-
) async throws -> SubscriptionEventStream {
188+
) async throws -> AsyncThrowingStream<GraphQLResult, Error> {
190189
let result = try await graphqlSubscribe(
191190
queryStrategy: SerialFieldExecutionStrategy(),
192191
mutationStrategy: SerialFieldExecutionStrategy(),

0 commit comments

Comments
 (0)