Skip to content

Commit 55d86ba

Browse files
fix: properly handle asynchronous read from stream (#1284)
* test: test ReadRows logic with local gRPC server * test: PR feedback * test: fix race condition in initialization * test: PR feedback, renaming a variable for readability * fix: properly handle asynchronous read from stream * test: skip failing Windows test * test: increase timeout on Windows * fix: PR feedback * test: only set lastScannedRowKey for completed rows * fix: bring back the lastScannedRowKey logic * test: pick latest changes from main branch * fix: add transform method to userStream, handle cancellation in it * fix: PR feedback --------- Co-authored-by: danieljbruce <[email protected]>
1 parent f953911 commit 55d86ba

File tree

4 files changed

+54
-22
lines changed

4 files changed

+54
-22
lines changed

src/chunktransformer.ts

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -129,10 +129,10 @@ export class ChunkTransformer extends Transform {
129129
* @public
130130
*
131131
* @param {object} data readrows response containing array of chunks.
132-
* @param {object} [enc] encoding options.
132+
* @param {object} [_encoding] encoding options.
133133
* @param {callback} next callback will be called once data is processed, with error if any error in processing
134134
*/
135-
_transform(data: Data, enc: string, next: Function): void {
135+
_transform(data: Data, _encoding: string, next: Function): void {
136136
for (const chunk of data.chunks!) {
137137
switch (this.state) {
138138
case RowStateEnum.NEW_ROW:
@@ -148,6 +148,7 @@ export class ChunkTransformer extends Transform {
148148
break;
149149
}
150150
if (this._destroyed) {
151+
next();
151152
return;
152153
}
153154
}
@@ -226,7 +227,16 @@ export class ChunkTransformer extends Transform {
226227
chunk.familyName ||
227228
chunk.qualifier ||
228229
(chunk.value && chunk.value.length !== 0) ||
229-
chunk.timestampMicros! > 0;
230+
// timestampMicros is an int64 in the protobuf definition,
231+
// which can be either a number or an instance of Long.
232+
// If it's a number...
233+
(typeof chunk.timestampMicros === 'number' &&
234+
chunk.timestampMicros! > 0) ||
235+
// If it's an instance of Long...
236+
(typeof chunk.timestampMicros === 'object' &&
237+
'compare' in chunk.timestampMicros &&
238+
typeof chunk.timestampMicros.compare === 'function' &&
239+
chunk.timestampMicros.compare(0) === 1);
230240
if (chunk.resetRow && containsData) {
231241
this.destroy(
232242
new TransformError({

src/table.ts

Lines changed: 39 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -745,22 +745,51 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`);
745745
filter = Filter.parse(options.filter);
746746
}
747747

748-
const userStream = new PassThrough({objectMode: true});
749-
const end = userStream.end.bind(userStream);
750-
userStream.end = () => {
748+
let chunkTransformer: ChunkTransformer;
749+
let rowStream: Duplex;
750+
751+
let userCanceled = false;
752+
const userStream = new PassThrough({
753+
objectMode: true,
754+
transform(row, _encoding, callback) {
755+
if (userCanceled) {
756+
callback();
757+
return;
758+
}
759+
callback(null, row);
760+
},
761+
});
762+
763+
// The caller should be able to call userStream.end() to stop receiving
764+
// more rows and cancel the stream prematurely. But also, the 'end' event
765+
// will be emitted if the stream ended normally. To tell these two
766+
// situations apart, we'll save the "original" end() function, and
767+
// will call it on rowStream.on('end').
768+
const originalEnd = userStream.end.bind(userStream);
769+
770+
// Taking care of this extra listener when piping and unpiping userStream:
771+
const rowStreamPipe = (rowStream: Duplex, userStream: PassThrough) => {
772+
rowStream.pipe(userStream, {end: false});
773+
rowStream.on('end', originalEnd);
774+
};
775+
const rowStreamUnpipe = (rowStream: Duplex, userStream: PassThrough) => {
751776
rowStream?.unpipe(userStream);
777+
rowStream?.removeListener('end', originalEnd);
778+
};
779+
780+
// eslint-disable-next-line @typescript-eslint/no-explicit-any
781+
userStream.end = (chunk?: any, encoding?: any, cb?: () => void) => {
782+
rowStreamUnpipe(rowStream, userStream);
783+
userCanceled = true;
752784
if (activeRequestStream) {
753785
activeRequestStream.abort();
754786
}
755787
if (retryTimer) {
756788
clearTimeout(retryTimer);
757789
}
758-
return end();
790+
return originalEnd(chunk, encoding, cb);
759791
};
760792

761-
let chunkTransformer: ChunkTransformer;
762-
let rowStream: Duplex;
763-
764793
const makeNewRequest = () => {
765794
// Avoid cancelling an expired timer if user
766795
// cancelled the stream in the middle of a retry
@@ -882,7 +911,7 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`);
882911
const toRowStream = new Transform({
883912
transform: (rowData, _, next) => {
884913
if (
885-
chunkTransformer._destroyed ||
914+
userCanceled ||
886915
// eslint-disable-next-line @typescript-eslint/no-explicit-any
887916
(userStream as any)._writableState.ended
888917
) {
@@ -913,7 +942,7 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`);
913942

914943
rowStream
915944
.on('error', (error: ServiceError) => {
916-
rowStream.unpipe(userStream);
945+
rowStreamUnpipe(rowStream, userStream);
917946
activeRequestStream = null;
918947
if (IGNORED_STATUS_CODES.has(error.code)) {
919948
// We ignore the `cancelled` "error", since we are the ones who cause
@@ -947,7 +976,7 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`);
947976
.on('end', () => {
948977
activeRequestStream = null;
949978
});
950-
rowStream.pipe(userStream);
979+
rowStreamPipe(rowStream, userStream);
951980
};
952981

953982
makeNewRequest();

test/chunktransformer.ts

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -997,12 +997,6 @@ describe('Bigtable/ChunkTransformer', () => {
997997
const err = callback.getCall(0).args[0];
998998
assert(!err, 'did not expect error');
999999
});
1000-
it('should return when stream is destroyed', () => {
1001-
chunkTransformer._destroyed = true;
1002-
const chunks = [{key: 'key'}];
1003-
chunkTransformer._transform({chunks}, {}, callback);
1004-
assert(!callback.called, 'unexpected call to next');
1005-
});
10061000
it('should change the `lastRowKey` value for `data.lastScannedRowKey`', () => {
10071001
chunkTransformer._transform(
10081002
{chunks: [], lastScannedRowKey: 'foo'},

test/readrows.ts

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -122,8 +122,7 @@ describe('Bigtable/ReadRows', () => {
122122
pipeline(readStream, transform, passThrough, () => {});
123123
});
124124

125-
// TODO(@alexander-fenster): enable after https://github.com/googleapis/nodejs-bigtable/issues/607 is fixed
126-
it.skip('should create read stream and read asynchronously using Transform stream', function (done) {
125+
it('should create read stream and read asynchronously using Transform stream', function (done) {
127126
if (process.platform === 'win32') {
128127
this.timeout(60000); // it runs much slower on Windows!
129128
}
@@ -222,7 +221,7 @@ describe('Bigtable/ReadRows', () => {
222221
});
223222
});
224223

225-
// TODO(@alexander-fenster): enable after it's fixed
224+
// TODO: enable after https://github.com/googleapis/nodejs-bigtable/issues/1286 is fixed
226225
it.skip('should be able to stop reading from the read stream when reading asynchronously', function (done) {
227226
if (process.platform === 'win32') {
228227
this.timeout(60000); // it runs much slower on Windows!

0 commit comments

Comments
 (0)