Skip to content

Stream Transport implementation using Arrow Flight #18424

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 42 commits into
base: main
Choose a base branch
from

Conversation

rishabhmaurya
Copy link
Contributor

@rishabhmaurya rishabhmaurya commented Jun 3, 2025

Description

Stream transport (#18722 | RFC: #18425) implementation using Arrow Flight RPC. This is in continuation to #18722 and currently contains all its diff as well.
Key features -

  • FlightTransport - a new transport implementation.
  • Use of all existing inbound logic of native transport.
  • Extension points in existing transport classes.
  • Serialization/Deserialization logic of TransportResponse to Arrow vectors.
  • New stream transport response outbound and inbound handling
  • Compatibility and integration with transport features like TaskManagement, HeaderPropagation, Thread context propagation across transport boundaries, tracing framework.
  • Example plugin to demonstrate stream transport based API definition and usage.
  • Flight stats API for relevant metrics for flight transport.
  • docs directory contains important documentation and diagrams for educational and reference purpose.

Testing

  • unit and integ test
  • search api integration and testing
  • Load testing using osb and big5
  • Chaos engineering by introducing lags both on server and client side while request in progress. More to follow

The whole transport is behind a feature flag, so no impact when plugin is not installed or feature flag is disabled (default).

Tests pending -

  • long running load.
  • more chaos engineering.

I have found the whole transport layer in OpenSearch to be quite complicated and this is a giant change. Thus, I'm sharing my cheat sheet for the reviewers (or anyone interested in OpenSearch's transport) to understand the existing flow and how the flow is modified to support stream transport using Flight in this PR -

sequenceDiagram
    participant Client
    participant TS as TransportService
    participant CM as ConnectionManager
    participant C as Connection
    participant TC as TcpChannel<br/>(Netty4TcpChannel)
    participant NOH as NativeOutboundHandler
    participant N as Network

    Note over Client,N: Netty4 Flow
    Client->>TS: Send TransportRequest
    TS->>TS: Generate reqID
    TS->>CM: Get Connection
    CM->>C: Provide Connection
    C->>TC: Use Channel
    TC->>NOH: Serialize to BytesReference<br/>(StreamOutput) with reqID
    NOH->>N: Send BytesReference

    participant Client2
    participant STS as StreamTransportService
    participant CM2 as ConnectionManager
    participant C2 as Connection
    participant FTC as FlightTcpChannel
    participant FMH as FlightMessageHandler
    participant FC as FlightClientChannel
    participant N2 as Network

    Note over Client2,N2: Flight Flow
    Client2->>STS: Send TransportRequest
    STS->>STS: Generate reqID
    STS->>CM2: Get Connection
    CM2->>C2: Provide Connection
    C2->>FTC: Use Channel
    FTC->>FMH: Serialize to Flight Ticket<br/>(ArrowStreamOutput) with reqID
    FMH->>FC: Send Flight Ticket
    FC->>N2: Transmit Request
Loading

2. Inbound Server: Netty4 vs. Flight

sequenceDiagram
    participant STC as Server TcpChannel<br/>(Netty4TcpChannel)
    participant IP as InboundPipeline
    participant IH as InboundHandler
    participant NMH as NativeMessageHandler
    participant RH as RequestHandler

    Note over STC,RH: Netty4 Flow
    STC->>IP: Receive BytesReference
    IP->>IH: Deserialize to InboundMessage<br/>(StreamInput)
    IH->>NMH: Interpret as TransportRequest
    NMH->>RH: Process Request

    participant FS as FlightServer
    participant FP as FlightProducer
    participant IP2 as InboundPipeline
    participant IH2 as InboundHandler
    participant NMH2 as NativeMessageHandler
    participant RH2 as RequestHandler

    Note over FS,RH2: Flight Flow
    FS->>FP: Receive Flight Ticket
    FP->>FP: Create VectorSchemaRoot
    FP->>FP: Create FlightServerChannel
    FP->>IP2: Pass to InboundPipeline
    IP2->>IH2: Deserialize with ArrowStreamInput
    IH2->>NMH2: Interpret as TransportRequest
    NMH2->>RH2: Process Request
Loading

3. Outbound Server: Netty4 vs. Flight

sequenceDiagram
    participant RH as RequestHandler
    participant OH as OutboundHandler
    participant TTC as TcpTransportChannel
    participant TC as TcpChannel

    Note over RH,TC: Netty4 Flow
    RH->>TTC: sendResponse(TransportResponse)
    TTC->>OH: Serialize TransportResponse<br/>(via sendResponse)
    OH->>TC: Send Serialized Data to Client

    participant RH2 as RequestHandler
    participant FTC as FlightTransportChannel
    participant FOH as FlightOutboundHandler
    participant FSC as FlightServerChannel
    participant SSL as ServerStreamListener

    Note over RH2,SSL: Flight Flow
    RH2->>FTC: sendResponseBatch(TransportResponse)
    FTC->>FOH: sendResponseBatch
    FOH->>FSC: sendBatch(VectorSchemaRoot)
    FSC->>SSL: start(root) (first batch)
    FSC->>SSL: putNext() (stream batch)
    RH2->>FTC: completeStream()
    FTC->>FOH: completeStream
    FOH->>FSC: completeStream
    FSC->>SSL: completed() (end stream)
Loading

4. Inbound Client: Netty4 vs. Flight

sequenceDiagram
    participant CTC as Client TcpChannel<br/>(Netty4TcpChannel)
    participant CIP as Client InboundPipeline
    participant CIH as Client InboundHandler
    participant RH as ResponseHandler

    Note over CTC,RH: Netty4 Flow
    CTC->>CIP: Receive BytesReference
    CIP->>CIH: Deserialize to TransportResponse<br/>(StreamInput)
    CIH->>RH: Deliver Response

    participant FC as FlightClient
    participant FCC as FlightClientChannel
    participant FTR as FlightTransportResponse
    participant RH2 as ResponseHandler

    Note over FC,RH2: Flight Flow (Async Response Handling)
    FC->>FCC: handleInboundStream(Ticket, Listener)
    FCC->>FTR: Create FlightTransportResponse
    FCC->>FCC: Retrieve Header and reqID
    FCC->>RH2: Get TransportResponseHandler<br/>using reqID
    FCC->>RH2: handler.handleStreamResponse(streamResponse)<br/>(Async Processing)
Loading

Related Issues

Resolves #[Issue number to be closed when this PR is merged]
#18425
#17695

Check List

  • Functionality includes testing.
  • API changes companion pull request created, if applicable.
  • Public documentation issue/PR created, if applicable.

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

Copy link
Contributor

github-actions bot commented Jun 3, 2025

❌ Gradle check result for 9a8ac93: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

Copy link
Contributor

github-actions bot commented Jun 4, 2025

❌ Gradle check result for 6c0eb1f: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

Copy link
Contributor

❌ Gradle check result for 4927302: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

Copy link
Contributor

❌ Gradle check result for 8818f6e: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@rishabhmaurya rishabhmaurya force-pushed the search-stream-transport branch 2 times, most recently from 3ede3a2 to fa7a38b Compare June 27, 2025 23:53
@rishabhmaurya rishabhmaurya force-pushed the search-stream-transport branch from fa7a38b to 4461c75 Compare June 28, 2025 00:04
Copy link
Contributor

❌ Gradle check result for 4461c75: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

Copy link
Contributor

❌ Gradle check result for 4461c75: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

Copy link
Contributor

❌ Gradle check result for 2233764: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@rishabhmaurya rishabhmaurya force-pushed the search-stream-transport branch from c256b09 to f26c9f8 Compare July 2, 2025 02:21
Copy link
Contributor

github-actions bot commented Jul 2, 2025

❌ Gradle check result for f26c9f8: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

Copy link
Contributor

github-actions bot commented Jul 3, 2025

❌ Gradle check result for f0d0c75: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

Copy link
Contributor

github-actions bot commented Jul 3, 2025

❌ Gradle check result for 46afedd: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@rishabhmaurya rishabhmaurya moved this from Todo to In Progress in Performance Roadmap Jul 3, 2025
@rishabhmaurya rishabhmaurya moved this from In Progress to In-Review in Performance Roadmap Jul 3, 2025
@rishabhmaurya rishabhmaurya self-assigned this Jul 3, 2025
Copy link
Contributor

github-actions bot commented Jul 3, 2025

❌ Gradle check result for f09dc7f: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@rishabhmaurya rishabhmaurya force-pushed the search-stream-transport branch 2 times, most recently from 3254ed5 to 70af3e2 Compare July 3, 2025 19:22
Copy link
Contributor

github-actions bot commented Jul 3, 2025

❌ Gradle check result for 70af3e2: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@rishabhmaurya rishabhmaurya force-pushed the search-stream-transport branch from 70af3e2 to a247bbf Compare July 4, 2025 02:27
Copy link
Contributor

github-actions bot commented Jul 4, 2025

❌ Gradle check result for a247bbf: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@rishabhmaurya rishabhmaurya force-pushed the search-stream-transport branch from a247bbf to 237d39e Compare July 4, 2025 06:26
Signed-off-by: Rishabh Maurya <[email protected]>
Signed-off-by: Rishabh Maurya <[email protected]>
Signed-off-by: Rishabh Maurya <[email protected]>
Signed-off-by: Rishabh Maurya <[email protected]>
Signed-off-by: Rishabh Maurya <[email protected]>
Signed-off-by: Rishabh Maurya <[email protected]>
Signed-off-by: Rishabh Maurya <[email protected]>
Signed-off-by: Rishabh Maurya <[email protected]>
@rishabhmaurya rishabhmaurya force-pushed the search-stream-transport branch from d36ecc2 to 73a33af Compare August 1, 2025 18:35
@rishabhmaurya rishabhmaurya added documentation Improvements or additions to documentation v3.2.0 labels Aug 1, 2025
Copy link
Contributor

github-actions bot commented Aug 1, 2025

❌ Gradle check result for 73a33af: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

Signed-off-by: Rishabh Maurya <[email protected]>
Copy link
Contributor

github-actions bot commented Aug 1, 2025

❌ Gradle check result for 8c4c34a: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

Signed-off-by: Rishabh Maurya <[email protected]>
Signed-off-by: Rishabh Maurya <[email protected]>
Copy link
Contributor

github-actions bot commented Aug 2, 2025

✅ Gradle check result for ecda165: SUCCESS

Signed-off-by: Rishabh Maurya <[email protected]>
Copy link
Contributor

github-actions bot commented Aug 2, 2025

❌ Gradle check result for 666a503: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

Copy link
Contributor

github-actions bot commented Aug 2, 2025

❌ Gradle check result for 666a503: null

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
documentation Improvements or additions to documentation skip-changelog v3.2.0
Projects
Status: In-Review
Development

Successfully merging this pull request may close these issues.

1 participant