Skip to content

Commit 883fc77

Browse files
jasnellnodejs-github-bot
authored andcommitted
events: allow use of AbortController with once
Allows an AbortSignal to be passed in to events.once() to cancel waiting on an event. Signed-off-by: James M Snell <[email protected]> PR-URL: #34911 Reviewed-By: Denys Otrishko <[email protected]> Reviewed-By: Benjamin Gruenbaum <[email protected]> Reviewed-By: Matteo Collina <[email protected]>
1 parent 37a8179 commit 883fc77

File tree

4 files changed

+179
-4
lines changed

4 files changed

+179
-4
lines changed

doc/api/events.md

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -825,7 +825,7 @@ class MyClass extends EventEmitter {
825825
}
826826
```
827827

828-
## `events.once(emitter, name)`
828+
## `events.once(emitter, name[, options])`
829829
<!-- YAML
830830
added:
831831
- v11.13.0
@@ -834,6 +834,9 @@ added:
834834

835835
* `emitter` {EventEmitter}
836836
* `name` {string}
837+
* `options` {Object}
838+
* `signal` {AbortSignal} An {AbortSignal} that may be used to cancel waiting
839+
for the event.
837840
* Returns: {Promise}
838841

839842
Creates a `Promise` that is fulfilled when the `EventEmitter` emits the given
@@ -892,6 +895,31 @@ ee.emit('error', new Error('boom'));
892895
// Prints: ok boom
893896
```
894897

898+
An {AbortSignal} may be used to cancel waiting for the event early:
899+
900+
```js
901+
const { EventEmitter, once } = require('events');
902+
903+
const ee = new EventEmitter();
904+
const ac = new AbortController();
905+
906+
async function foo(emitter, event, signal) {
907+
try {
908+
await once(emitter, event, { signal });
909+
console.log('event emitted!');
910+
} catch (error) {
911+
if (error.name === 'AbortError') {
912+
console.error('Waiting for the event was canceled!');
913+
} else {
914+
console.error('There was an error', error.message);
915+
}
916+
}
917+
}
918+
919+
foo(ee, 'foo', ac.signal);
920+
ac.abort(); // Abort waiting for the event
921+
```
922+
895923
### Awaiting multiple events emitted on `process.nextTick()`
896924

897925
There is an edge case worth noting when using the `events.once()` function

lib/events.js

Lines changed: 52 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ const kRejection = SymbolFor('nodejs.rejection');
4444
let spliceOne;
4545

4646
const {
47+
hideStackFrames,
4748
kEnhanceStackBeforeInspector,
4849
codes
4950
} = require('internal/errors');
@@ -57,9 +58,20 @@ const {
5758
inspect
5859
} = require('internal/util/inspect');
5960

61+
const {
62+
validateAbortSignal
63+
} = require('internal/validators');
64+
6065
const kCapture = Symbol('kCapture');
6166
const kErrorMonitor = Symbol('events.errorMonitor');
6267

68+
let DOMException;
69+
const lazyDOMException = hideStackFrames((message, name) => {
70+
if (DOMException === undefined)
71+
DOMException = internalBinding('messaging').DOMException;
72+
return new DOMException(message, name);
73+
});
74+
6375
function EventEmitter(opts) {
6476
EventEmitter.init.call(this, opts);
6577
}
@@ -621,22 +633,61 @@ function unwrapListeners(arr) {
621633
return ret;
622634
}
623635

624-
function once(emitter, name) {
636+
async function once(emitter, name, options = {}) {
637+
const signal = options ? options.signal : undefined;
638+
validateAbortSignal(signal, 'options.signal');
639+
if (signal && signal.aborted)
640+
throw lazyDOMException('The operation was aborted', 'AbortError');
625641
return new Promise((resolve, reject) => {
626642
const errorListener = (err) => {
627643
emitter.removeListener(name, resolver);
644+
if (signal != null) {
645+
eventTargetAgnosticRemoveListener(
646+
signal,
647+
'abort',
648+
abortListener,
649+
{ once: true });
650+
}
628651
reject(err);
629652
};
630653
const resolver = (...args) => {
631654
if (typeof emitter.removeListener === 'function') {
632655
emitter.removeListener('error', errorListener);
633656
}
657+
if (signal != null) {
658+
eventTargetAgnosticRemoveListener(
659+
signal,
660+
'abort',
661+
abortListener,
662+
{ once: true });
663+
}
634664
resolve(args);
635665
};
636666
eventTargetAgnosticAddListener(emitter, name, resolver, { once: true });
637667
if (name !== 'error') {
638668
addErrorHandlerIfEventEmitter(emitter, errorListener, { once: true });
639669
}
670+
function abortListener() {
671+
if (typeof emitter.removeListener === 'function') {
672+
emitter.removeListener(name, resolver);
673+
emitter.removeListener('error', errorListener);
674+
} else {
675+
eventTargetAgnosticRemoveListener(
676+
emitter,
677+
name,
678+
resolver,
679+
{ once: true });
680+
eventTargetAgnosticRemoveListener(
681+
emitter,
682+
'error',
683+
errorListener,
684+
{ once: true });
685+
}
686+
reject(lazyDOMException('The operation was aborted', 'AbortError'));
687+
}
688+
if (signal != null) {
689+
signal.addEventListener('abort', abortListener, { once: true });
690+
}
640691
});
641692
}
642693

lib/internal/validators.js

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -216,6 +216,15 @@ const validateCallback = hideStackFrames((callback) => {
216216
throw new ERR_INVALID_CALLBACK(callback);
217217
});
218218

219+
const validateAbortSignal = hideStackFrames((signal, name) => {
220+
if (signal !== undefined &&
221+
(signal === null ||
222+
typeof signal !== 'object' ||
223+
!('aborted' in signal))) {
224+
throw new ERR_INVALID_ARG_TYPE(name, 'AbortSignal', signal);
225+
}
226+
});
227+
219228
module.exports = {
220229
isInt32,
221230
isUint32,
@@ -234,4 +243,5 @@ module.exports = {
234243
validateString,
235244
validateUint32,
236245
validateCallback,
246+
validateAbortSignal,
237247
};

test/parallel/test-events-once.js

Lines changed: 88 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,14 @@
11
'use strict';
2-
// Flags: --expose-internals
2+
// Flags: --expose-internals --no-warnings
33

44
const common = require('../common');
55
const { once, EventEmitter } = require('events');
6-
const { strictEqual, deepStrictEqual, fail } = require('assert');
6+
const {
7+
strictEqual,
8+
deepStrictEqual,
9+
fail,
10+
rejects,
11+
} = require('assert');
712
const { EventTarget, Event } = require('internal/event_target');
813

914
async function onceAnEvent() {
@@ -114,6 +119,81 @@ async function prioritizesEventEmitter() {
114119
process.nextTick(() => ee.emit('foo'));
115120
await once(ee, 'foo');
116121
}
122+
123+
async function abortSignalBefore() {
124+
const ee = new EventEmitter();
125+
const ac = new AbortController();
126+
ee.on('error', common.mustNotCall());
127+
ac.abort();
128+
129+
await Promise.all([1, {}, 'hi', null, false].map((signal) => {
130+
return rejects(once(ee, 'foo', { signal }), {
131+
code: 'ERR_INVALID_ARG_TYPE'
132+
});
133+
}));
134+
135+
return rejects(once(ee, 'foo', { signal: ac.signal }), {
136+
name: 'AbortError'
137+
});
138+
}
139+
140+
async function abortSignalAfter() {
141+
const ee = new EventEmitter();
142+
const ac = new AbortController();
143+
ee.on('error', common.mustNotCall());
144+
const r = rejects(once(ee, 'foo', { signal: ac.signal }), {
145+
name: 'AbortError'
146+
});
147+
process.nextTick(() => ac.abort());
148+
return r;
149+
}
150+
151+
async function abortSignalAfterEvent() {
152+
const ee = new EventEmitter();
153+
const ac = new AbortController();
154+
process.nextTick(() => {
155+
ee.emit('foo');
156+
ac.abort();
157+
});
158+
await once(ee, 'foo', { signal: ac.signal });
159+
}
160+
161+
async function eventTargetAbortSignalBefore() {
162+
const et = new EventTarget();
163+
const ac = new AbortController();
164+
ac.abort();
165+
166+
await Promise.all([1, {}, 'hi', null, false].map((signal) => {
167+
return rejects(once(et, 'foo', { signal }), {
168+
code: 'ERR_INVALID_ARG_TYPE'
169+
});
170+
}));
171+
172+
return rejects(once(et, 'foo', { signal: ac.signal }), {
173+
name: 'AbortError'
174+
});
175+
}
176+
177+
async function eventTargetAbortSignalAfter() {
178+
const et = new EventTarget();
179+
const ac = new AbortController();
180+
const r = rejects(once(et, 'foo', { signal: ac.signal }), {
181+
name: 'AbortError'
182+
});
183+
process.nextTick(() => ac.abort());
184+
return r;
185+
}
186+
187+
async function eventTargetAbortSignalAfterEvent() {
188+
const et = new EventTarget();
189+
const ac = new AbortController();
190+
process.nextTick(() => {
191+
et.dispatchEvent(new Event('foo'));
192+
ac.abort();
193+
});
194+
await once(et, 'foo', { signal: ac.signal });
195+
}
196+
117197
Promise.all([
118198
onceAnEvent(),
119199
onceAnEventWithTwoArgs(),
@@ -123,4 +203,10 @@ Promise.all([
123203
onceWithEventTarget(),
124204
onceWithEventTargetError(),
125205
prioritizesEventEmitter(),
206+
abortSignalBefore(),
207+
abortSignalAfter(),
208+
abortSignalAfterEvent(),
209+
eventTargetAbortSignalBefore(),
210+
eventTargetAbortSignalAfter(),
211+
eventTargetAbortSignalAfterEvent(),
126212
]).then(common.mustCall());

0 commit comments

Comments
 (0)