1
1
import Atomics
2
2
import NIOEmbedded
3
- import Dispatch
3
+ import NIOPosix
4
4
import XCTest
5
5
@testable import PostgresNIO
6
6
import NIOCore
7
7
import Logging
8
8
9
9
final class PostgresRowSequenceTests : XCTestCase {
10
10
let logger = Logger ( label: " PSQLRowStreamTests " )
11
- let eventLoop = EmbeddedEventLoop ( )
12
11
13
12
func testBackpressureWorks( ) async throws {
14
13
let dataSource = MockRowDataSource ( )
14
+ let embeddedEventLoop = EmbeddedEventLoop ( )
15
15
let stream = PSQLRowStream (
16
16
source: . stream(
17
17
[
18
18
. init( name: " test " , tableOID: 0 , columnAttributeNumber: 0 , dataType: . int8, dataTypeSize: 8 , dataTypeModifier: 0 , format: . binary)
19
19
] ,
20
20
dataSource
21
21
) ,
22
- eventLoop: self . eventLoop ,
22
+ eventLoop: embeddedEventLoop ,
23
23
logger: self . logger
24
24
)
25
25
@@ -41,14 +41,15 @@ final class PostgresRowSequenceTests: XCTestCase {
41
41
42
42
func testCancellationWorksWhileIterating( ) async throws {
43
43
let dataSource = MockRowDataSource ( )
44
+ let embeddedEventLoop = EmbeddedEventLoop ( )
44
45
let stream = PSQLRowStream (
45
46
source: . stream(
46
47
[
47
48
. init( name: " test " , tableOID: 0 , columnAttributeNumber: 0 , dataType: . int8, dataTypeSize: 8 , dataTypeModifier: 0 , format: . binary)
48
49
] ,
49
50
dataSource
50
51
) ,
51
- eventLoop: self . eventLoop ,
52
+ eventLoop: embeddedEventLoop ,
52
53
logger: self . logger
53
54
)
54
55
@@ -72,14 +73,15 @@ final class PostgresRowSequenceTests: XCTestCase {
72
73
73
74
func testCancellationWorksBeforeIterating( ) async throws {
74
75
let dataSource = MockRowDataSource ( )
76
+ let embeddedEventLoop = EmbeddedEventLoop ( )
75
77
let stream = PSQLRowStream (
76
78
source: . stream(
77
79
[
78
80
. init( name: " test " , tableOID: 0 , columnAttributeNumber: 0 , dataType: . int8, dataTypeSize: 8 , dataTypeModifier: 0 , format: . binary)
79
81
] ,
80
82
dataSource
81
83
) ,
82
- eventLoop: self . eventLoop ,
84
+ eventLoop: embeddedEventLoop ,
83
85
logger: self . logger
84
86
)
85
87
@@ -97,14 +99,15 @@ final class PostgresRowSequenceTests: XCTestCase {
97
99
98
100
func testDroppingTheSequenceCancelsTheSource( ) async throws {
99
101
let dataSource = MockRowDataSource ( )
102
+ let embeddedEventLoop = EmbeddedEventLoop ( )
100
103
let stream = PSQLRowStream (
101
104
source: . stream(
102
105
[
103
106
. init( name: " test " , tableOID: 0 , columnAttributeNumber: 0 , dataType: . int8, dataTypeSize: 8 , dataTypeModifier: 0 , format: . binary)
104
107
] ,
105
108
dataSource
106
109
) ,
107
- eventLoop: self . eventLoop ,
110
+ eventLoop: embeddedEventLoop ,
108
111
logger: self . logger
109
112
)
110
113
@@ -117,14 +120,15 @@ final class PostgresRowSequenceTests: XCTestCase {
117
120
118
121
func testStreamBasedOnCompletedQuery( ) async throws {
119
122
let dataSource = MockRowDataSource ( )
123
+ let embeddedEventLoop = EmbeddedEventLoop ( )
120
124
let stream = PSQLRowStream (
121
125
source: . stream(
122
126
[
123
127
. init( name: " test " , tableOID: 0 , columnAttributeNumber: 0 , dataType: . int8, dataTypeSize: 8 , dataTypeModifier: 0 , format: . binary)
124
128
] ,
125
129
dataSource
126
130
) ,
127
- eventLoop: self . eventLoop ,
131
+ eventLoop: embeddedEventLoop ,
128
132
logger: self . logger
129
133
)
130
134
@@ -144,14 +148,15 @@ final class PostgresRowSequenceTests: XCTestCase {
144
148
145
149
func testStreamIfInitializedWithAllData( ) async throws {
146
150
let dataSource = MockRowDataSource ( )
151
+ let embeddedEventLoop = EmbeddedEventLoop ( )
147
152
let stream = PSQLRowStream (
148
153
source: . stream(
149
154
[
150
155
. init( name: " test " , tableOID: 0 , columnAttributeNumber: 0 , dataType: . int8, dataTypeSize: 8 , dataTypeModifier: 0 , format: . binary)
151
156
] ,
152
157
dataSource
153
158
) ,
154
- eventLoop: self . eventLoop ,
159
+ eventLoop: embeddedEventLoop ,
155
160
logger: self . logger
156
161
)
157
162
@@ -172,14 +177,15 @@ final class PostgresRowSequenceTests: XCTestCase {
172
177
173
178
func testStreamIfInitializedWithError( ) async throws {
174
179
let dataSource = MockRowDataSource ( )
180
+ let embeddedEventLoop = EmbeddedEventLoop ( )
175
181
let stream = PSQLRowStream (
176
182
source: . stream(
177
183
[
178
184
. init( name: " test " , tableOID: 0 , columnAttributeNumber: 0 , dataType: . int8, dataTypeSize: 8 , dataTypeModifier: 0 , format: . binary)
179
185
] ,
180
186
dataSource
181
187
) ,
182
- eventLoop: self . eventLoop ,
188
+ eventLoop: embeddedEventLoop ,
183
189
logger: self . logger
184
190
)
185
191
@@ -200,29 +206,30 @@ final class PostgresRowSequenceTests: XCTestCase {
200
206
201
207
func testSucceedingRowContinuationsWorks( ) async throws {
202
208
let dataSource = MockRowDataSource ( )
209
+ let eventLoop = NIOSingletons . posixEventLoopGroup. next ( )
203
210
let stream = PSQLRowStream (
204
211
source: . stream(
205
212
[
206
213
. init( name: " test " , tableOID: 0 , columnAttributeNumber: 0 , dataType: . int8, dataTypeSize: 8 , dataTypeModifier: 0 , format: . binary)
207
214
] ,
208
215
dataSource
209
216
) ,
210
- eventLoop: self . eventLoop,
217
+ eventLoop: eventLoop,
211
218
logger: self . logger
212
219
)
213
220
214
- let rowSequence = stream. asyncSequence ( )
221
+ let rowSequence = try await eventLoop . submit { stream. asyncSequence ( ) } . get ( )
215
222
var rowIterator = rowSequence. makeAsyncIterator ( )
216
223
217
- DispatchQueue . main . asyncAfter ( deadline : . now ( ) + . seconds( 1 ) ) {
224
+ eventLoop . scheduleTask ( in : . seconds( 1 ) ) {
218
225
let dataRows : [ DataRow ] = ( 0 ..< 1 ) . map { [ ByteBuffer ( integer: Int64 ( $0) ) ] }
219
226
stream. receive ( dataRows)
220
227
}
221
228
222
229
let row1 = try await rowIterator. next ( )
223
230
XCTAssertEqual ( try row1? . decode ( Int . self) , 0 )
224
231
225
- DispatchQueue . main . asyncAfter ( deadline : . now ( ) + . seconds( 1 ) ) {
232
+ eventLoop . scheduleTask ( in : . seconds( 1 ) ) {
226
233
stream. receive ( completion: . success( " SELECT 1 " ) )
227
234
}
228
235
@@ -232,29 +239,30 @@ final class PostgresRowSequenceTests: XCTestCase {
232
239
233
240
func testFailingRowContinuationsWorks( ) async throws {
234
241
let dataSource = MockRowDataSource ( )
242
+ let eventLoop = NIOSingletons . posixEventLoopGroup. next ( )
235
243
let stream = PSQLRowStream (
236
244
source: . stream(
237
245
[
238
246
. init( name: " test " , tableOID: 0 , columnAttributeNumber: 0 , dataType: . int8, dataTypeSize: 8 , dataTypeModifier: 0 , format: . binary)
239
247
] ,
240
248
dataSource
241
249
) ,
242
- eventLoop: self . eventLoop,
250
+ eventLoop: eventLoop,
243
251
logger: self . logger
244
252
)
245
253
246
- let rowSequence = stream. asyncSequence ( )
254
+ let rowSequence = try await eventLoop . submit { stream. asyncSequence ( ) } . get ( )
247
255
var rowIterator = rowSequence. makeAsyncIterator ( )
248
256
249
- DispatchQueue . main . asyncAfter ( deadline : . now ( ) + . seconds( 1 ) ) {
257
+ eventLoop . scheduleTask ( in : . seconds( 1 ) ) {
250
258
let dataRows : [ DataRow ] = ( 0 ..< 1 ) . map { [ ByteBuffer ( integer: Int64 ( $0) ) ] }
251
259
stream. receive ( dataRows)
252
260
}
253
261
254
262
let row1 = try await rowIterator. next ( )
255
263
XCTAssertEqual ( try row1? . decode ( Int . self) , 0 )
256
264
257
- DispatchQueue . main . asyncAfter ( deadline : . now ( ) + . seconds( 1 ) ) {
265
+ eventLoop . scheduleTask ( in : . seconds( 1 ) ) {
258
266
stream. receive ( completion: . failure( PSQLError . serverClosedConnection ( underlying: nil ) ) )
259
267
}
260
268
@@ -268,14 +276,15 @@ final class PostgresRowSequenceTests: XCTestCase {
268
276
269
277
func testAdaptiveRowBufferShrinksAndGrows( ) async throws {
270
278
let dataSource = MockRowDataSource ( )
279
+ let embeddedEventLoop = EmbeddedEventLoop ( )
271
280
let stream = PSQLRowStream (
272
281
source: . stream(
273
282
[
274
283
. init( name: " test " , tableOID: 0 , columnAttributeNumber: 0 , dataType: . int8, dataTypeSize: 8 , dataTypeModifier: 0 , format: . binary)
275
284
] ,
276
285
dataSource
277
286
) ,
278
- eventLoop: self . eventLoop ,
287
+ eventLoop: embeddedEventLoop ,
279
288
logger: self . logger
280
289
)
281
290
@@ -332,14 +341,15 @@ final class PostgresRowSequenceTests: XCTestCase {
332
341
333
342
func testAdaptiveRowShrinksToMin( ) async throws {
334
343
let dataSource = MockRowDataSource ( )
344
+ let embeddedEventLoop = EmbeddedEventLoop ( )
335
345
let stream = PSQLRowStream (
336
346
source: . stream(
337
347
[
338
348
. init( name: " test " , tableOID: 0 , columnAttributeNumber: 0 , dataType: . int8, dataTypeSize: 8 , dataTypeModifier: 0 , format: . binary)
339
349
] ,
340
350
dataSource
341
351
) ,
342
- eventLoop: self . eventLoop ,
352
+ eventLoop: embeddedEventLoop ,
343
353
logger: self . logger
344
354
)
345
355
@@ -386,14 +396,15 @@ final class PostgresRowSequenceTests: XCTestCase {
386
396
387
397
func testStreamBufferAcceptsNewRowsEventhoughItDidntAskForIt( ) async throws {
388
398
let dataSource = MockRowDataSource ( )
399
+ let embeddedEventLoop = EmbeddedEventLoop ( )
389
400
let stream = PSQLRowStream (
390
401
source: . stream(
391
402
[
392
403
. init( name: " test " , tableOID: 0 , columnAttributeNumber: 0 , dataType: . int8, dataTypeSize: 8 , dataTypeModifier: 0 , format: . binary)
393
404
] ,
394
405
dataSource
395
406
) ,
396
- eventLoop: self . eventLoop ,
407
+ eventLoop: embeddedEventLoop ,
397
408
logger: self . logger
398
409
)
399
410
0 commit comments