WebSocketStream: ストリームと WebSocket API の統合

バックプレッシャーを適用して、アプリが WebSocket メッセージで溢れたり、WebSocket サーバーにメッセージが殺到したりするのを防ぎます。

背景

WebSocket API は、WebSocket プロトコルへの JavaScript インターフェースを提供します。これにより、ユーザーのブラウザとサーバー間で双方向のインタラクティブな通信セッションを開くことができます。この API を使用すると、サーバーにメッセージを送信し、返信をポーリングすることなくイベント駆動型のレスポンスを受信できます。

Streams API

Streams API を使用すると、JavaScript でネットワーク経由で受信したデータチャンクのストリームにプログラムからアクセスし、必要に応じて処理できます。ストリームのコンテキストにおける重要なコンセプトは、バックプレッシャーです。これは、単一のストリームまたはパイプチェーンが読み取りまたは書き込みの速度を調整するプロセスです。ストリーム自体またはパイプチェーンの後続のストリームがまだビジー状態で、さらにチャンクを受け入れる準備ができていない場合は、チェーンを逆方向にシグナルを送信して、配信を適宜遅らせます。

現在の WebSocket API の問題

受信したメッセージにバックプレッシャーを適用することはできません

現在の WebSocket API では、メッセージへの反応は WebSocket.onmessage で行われます。これは、サーバーからメッセージを受信したときに呼び出される EventHandler です。

新しいメッセージが受信されるたびに、大量のデータ処理オペレーションを実行する必要があるアプリケーションがあるとします。おそらく、次のコードのようなフローを設定するでしょう。また、process() 呼び出しの結果を await するため、問題はないはずです。

// A heavy data crunching operation.
const process = async (data) => {
  return new Promise((resolve) => {
    window.setTimeout(() => {
      console.log('WebSocket message processed:', data);
      return resolve('done');
    }, 1000);
  });
};

webSocket.onmessage = async (event) => {
  const data = event.data;
  // Await the result of the processing step in the message handler.
  await process(data);
};

不正解です。現在の WebSocket API の問題は、バックプレッシャーを適用する方法がないことです。メッセージが process() メソッドで処理できる速度よりも速く到着すると、レンダリング プロセスはメッセージをバッファリングしてメモリを使い果たすか、CPU 使用率が 100% になって応答しなくなるか、その両方になります。

送信されたメッセージにバックプレッシャーを適用するのは人間工学に反する

送信されたメッセージにバックプレッシャーを適用することは可能ですが、WebSocket.bufferedAmount プロパティのポーリングが必要になるため、非効率的で人間工学的ではありません。この読み取り専用プロパティは、WebSocket.send() の呼び出しを使用してキューに登録されたものの、まだネットワークに送信されていないデータのバイト数を返します。この値は、キューに登録されたすべてのデータが送信されると 0 にリセットされますが、WebSocket.send() を呼び出し続けると、値は増え続けます。

WebSocketStream API とは何ですか?

WebSocketStream API は、ストリームを WebSocket API と統合することで、バックプレッシャーが存在しない、または人間工学的に適していないという問題を解決します。つまり、追加費用なしでバックプレッシャーを適用できます。

WebSocketStream API のユースケースの例

この API を使用できるサイトの例:

  • インタラクティブ性を維持する必要がある高帯域幅の WebSocket アプリケーション(特に動画や画面共有)。
  • 同様に、ブラウザで大量のデータを生成し、サーバーにアップロードする必要がある動画キャプチャなどのアプリケーションも該当します。バックプレッシャーを使用すると、クライアントはメモリにデータを蓄積するのではなく、データの生成を停止できます。

現在のステータス

ステップ ステータス
1. 説明を作成する 完了
2. 仕様の最初のドラフトを作成する 作成中
3. フィードバックを収集してデザインを反復 作成中
4. オリジン トライアル 完了
5. リリース 開始していません

WebSocketStream API の使用方法

WebSocketStream API は Promise ベースであるため、最新の JavaScript の世界で自然に扱うことができます。まず、新しい WebSocketStream を構築し、WebSocket サーバーの URL を渡します。次に、接続が opened になるまで待機します。これにより、ReadableStreamWritableStream が生成されます。

ReadableStream.getReader() メソッドを呼び出すと、最終的に ReadableStreamDefaultReader が取得されます。この ReadableStreamDefaultReader から、ストリームが完了するまで、つまり {value: undefined, done: true} 形式のオブジェクトが返されるまで、read() データを取得できます。

したがって、WritableStream.getWriter() メソッドを呼び出すことで、最終的に WritableStreamDefaultWriter を取得し、それに write() データを取得できます。

  const wss = new WebSocketStream(WSS_URL);
  const {readable, writable} = await wss.opened;
  const reader = readable.getReader();
  const writer = writable.getWriter();

  while (true) {
    const {value, done} = await reader.read();
    if (done) {
      break;
    }
    const result = await process(value);
    await writer.write(result);
  }

バックプレッシャー

約束されたバックプレッシャー機能はどうなっていますか?追加の手順は必要ありません。process() に時間がかかると、パイプラインの準備が整うまで次のメッセージは消費されません。同様に、WritableStreamDefaultWriter.write() ステップは安全な場合にのみ続行されます。

高度な例

WebSocketStream の 2 番目の引数は、将来の拡張を可能にするオプション バッグです。オプションは protocols のみです。これは、WebSocket コンストラクタの 2 番目の引数と同じように動作します。

const chatWSS = new WebSocketStream(CHAT_URL, {protocols: ['chat', 'chatv2']});
const {protocol} = await chatWSS.opened;

選択された protocol と潜在的な extensions は、WebSocketStream.opened Promise を介して利用可能な辞書の一部です。接続が失敗した場合は関連性がないため、ライブ接続に関するすべての情報は、この Promise によって提供されます。

const {readable, writable, protocol, extensions} = await chatWSS.opened;

終了した WebSocketStream 接続に関する情報

WebSocket API の WebSocket.onclose イベントと WebSocket.onerror イベントから取得できた情報が、WebSocketStream.closed プロミスで取得できるようになりました。クリーンでないクローズの場合、Promise は拒否されます。それ以外の場合は、サーバーから送信されたコードと理由に解決されます。

考えられるすべてのステータス コードとその意味については、CloseEvent ステータス コードのリストをご覧ください。

const {code, reason} = await chatWSS.closed;

WebSocketStream 接続を閉じる

WebSocketStream は AbortController で閉じることができます。したがって、AbortSignalWebSocketStream コンストラクタに渡します。AbortController.abort() は、ハンドシェイクのにのみ機能します。

const controller = new AbortController();
const wss = new WebSocketStream(URL, {signal: controller.signal});
setTimeout(() => controller.abort(), 1000);

別の方法として WebSocketStream.close() メソッドを使用することもできますが、このメソッドの主な目的は、サーバーに送信されるコードと理由を指定できるようにすることです。

wss.close({closeCode: 4000, reason: 'Game over'});

プログレッシブ エンハンスメントと相互運用性

現在、WebSocketStream API を実装しているブラウザは Chrome のみです。従来の WebSocket API との相互運用性のため、受信メッセージにバックプレッシャーを適用することはできません。送信されたメッセージにバックプレッシャーを適用することは可能ですが、WebSocket.bufferedAmount プロパティのポーリングが必要になるため、非効率的で人間工学的ではありません。

特徴検出

WebSocketStream API がサポートされているかどうかを確認するには、次のコードを使用します。

if ('WebSocketStream' in window) {
  // `WebSocketStream` is supported!
}

デモ

サポートされているブラウザでは、埋め込み iframe または Glitch で直接 WebSocketStream API の動作を確認できます。

フィードバック

Chrome チームは、WebSocketStream API の使用感について皆様のご意見をお待ちしています。

API 設計について教えてください

API が想定どおりに動作しない点はありますか?アイデアを実装するために必要なメソッドやプロパティが不足している場合はどうすればよいですか?セキュリティ モデルについてご質問やご意見がある場合は、対応する GitHub リポジトリで仕様に関する問題を報告するか、既存の問題にご意見を追加してください。

実装に関する問題を報告する

Chrome の実装にバグが見つかりましたか? それとも、実装が仕様と異なるのでしょうか?new.crbug.com でバグを報告します。できるだけ詳細な情報と、問題を再現するための簡単な手順を記載し、[Components] ボックスに Blink>Network>WebSockets と入力してください。

API のサポートを表示する

WebSocketStream API を使用する予定はありますか?公開サポートは、Chrome チームが機能の優先順位を決定するのに役立ち、他のブラウザ ベンダーにサポートの重要性を示すことができます。

ハッシュタグ #WebSocketStream を使用して @ChromiumDev にツイートし、どこでどのように使用しているかをお知らせください。

関連情報

謝辞

WebSocketStream API は Adam RiceYutaka Hirano によって実装されました。