Skip to content

feat: Add an OOTB Chat uI to the Feature Server to support RAG demo #5106

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions MANIFEST.in
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@ prune examples

graft sdk/python/feast/ui/build
graft sdk/python/feast/embedded_go/lib
recursive-include sdk/python/feast/static *
107 changes: 106 additions & 1 deletion sdk/python/feast/feature_server.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,29 @@
import asyncio
import os
import sys
import threading
import time
import traceback
from contextlib import asynccontextmanager
from importlib import resources as importlib_resources
from typing import Any, Dict, List, Optional

import pandas as pd
import psutil
from dateutil import parser
from fastapi import Depends, FastAPI, Request, Response, status
from fastapi import (
Depends,
FastAPI,
Request,
Response,
WebSocket,
WebSocketDisconnect,
status,
)
from fastapi.concurrency import run_in_threadpool
from fastapi.logger import logger
from fastapi.responses import JSONResponse
from fastapi.staticfiles import StaticFiles
from google.protobuf.json_format import MessageToDict
from prometheus_client import Gauge, start_http_server
from pydantic import BaseModel
Expand Down Expand Up @@ -78,6 +90,15 @@ class GetOnlineFeaturesRequest(BaseModel):
query_string: Optional[str] = None


class ChatMessage(BaseModel):
role: str
content: str


class ChatRequest(BaseModel):
messages: List[ChatMessage]


def _get_features(request: GetOnlineFeaturesRequest, store: "feast.FeatureStore"):
if request.feature_service:
feature_service = store.get_feature_service(
Expand Down Expand Up @@ -113,6 +134,35 @@ def get_app(
store: "feast.FeatureStore",
registry_ttl_sec: int = DEFAULT_FEATURE_SERVER_REGISTRY_TTL,
):
"""
Creates a FastAPI app that can be used to start a feature server.

Args:
store: The FeatureStore to use for serving features
registry_ttl_sec: The TTL in seconds for the registry cache

Returns:
A FastAPI app

Example:
```python
from feast import FeatureStore

store = FeatureStore(repo_path="feature_repo")
app = get_app(store)
```

The app provides the following endpoints:
- `/get-online-features`: Get online features
- `/retrieve-online-documents`: Retrieve online documents
- `/push`: Push features to the feature store
- `/write-to-online-store`: Write to the online store
- `/health`: Health check
- `/materialize`: Materialize features
- `/materialize-incremental`: Materialize features incrementally
- `/chat`: Chat UI
- `/ws/chat`: WebSocket endpoint for chat
"""
proto_json.patch()
# Asynchronously refresh registry, notifying shutdown and canceling the active timer if the app is shutting down
registry_proto = None
Expand Down Expand Up @@ -297,6 +347,21 @@ async def health():
else Response(status_code=status.HTTP_503_SERVICE_UNAVAILABLE)
)

@app.post("/chat")
async def chat(request: ChatRequest):
# Process the chat request
# For now, just return dummy text
return {"response": "This is a dummy response from the Feast feature server."}

@app.get("/chat")
async def chat_ui():
# Serve the chat UI
static_dir_ref = importlib_resources.files(__spec__.parent) / "static/chat" # type: ignore[name-defined, arg-type]
with importlib_resources.as_file(static_dir_ref) as static_dir:
with open(os.path.join(static_dir, "index.html")) as f:
content = f.read()
return Response(content=content, media_type="text/html")

@app.post("/materialize", dependencies=[Depends(inject_user_details)])
def materialize(request: MaterializeRequest) -> None:
for feature_view in request.feature_views or []:
Expand Down Expand Up @@ -337,6 +402,46 @@ async def rest_exception_handler(request: Request, exc: Exception):
content=str(exc),
)

# Chat WebSocket connection manager
class ConnectionManager:
def __init__(self):
self.active_connections: List[WebSocket] = []

async def connect(self, websocket: WebSocket):
await websocket.accept()
self.active_connections.append(websocket)

def disconnect(self, websocket: WebSocket):
self.active_connections.remove(websocket)

async def send_message(self, message: str, websocket: WebSocket):
await websocket.send_text(message)

manager = ConnectionManager()

@app.websocket("/ws/chat")
async def websocket_endpoint(websocket: WebSocket):
await manager.connect(websocket)
try:
while True:
message = await websocket.receive_text()
# Process the received message (currently unused but kept for future implementation)
# For now, just return dummy text
response = f"You sent: '{message}'. This is a dummy response from the Feast feature server."

# Stream the response word by word
words = response.split()
for word in words:
await manager.send_message(word + " ", websocket)
await asyncio.sleep(0.1) # Add a small delay between words
except WebSocketDisconnect:
manager.disconnect(websocket)

# Mount static files
static_dir_ref = importlib_resources.files(__spec__.parent) / "static" # type: ignore[name-defined, arg-type]
with importlib_resources.as_file(static_dir_ref) as static_dir:
app.mount("/static", StaticFiles(directory=static_dir), name="static")

return app


Expand Down
129 changes: 129 additions & 0 deletions sdk/python/feast/static/chat/index.html
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Feast Chat</title>
<style>
body {
font-family: Arial, sans-serif;
margin: 0;
padding: 0;
display: flex;
flex-direction: column;
height: 100vh;
}
.chat-container {
flex: 1;
display: flex;
flex-direction: column;
padding: 20px;
overflow-y: auto;
}
.message {
margin-bottom: 10px;
padding: 10px;
border-radius: 5px;
max-width: 70%;
}
.user-message {
background-color: #e6f7ff;
align-self: flex-end;
}
.bot-message {
background-color: #f0f0f0;
align-self: flex-start;
}
.input-container {
display: flex;
padding: 10px;
border-top: 1px solid #ccc;
}
#message-input {
flex: 1;
padding: 10px;
border: 1px solid #ccc;
border-radius: 5px;
margin-right: 10px;
}
#send-button {
padding: 10px 20px;
background-color: #1890ff;
color: white;
border: none;
border-radius: 5px;
cursor: pointer;
}
</style>
</head>
<body>
<div class="chat-container" id="chat-container">
<div class="message bot-message">Hello! How can I help you today?</div>
</div>
<div class="input-container">
<input type="text" id="message-input" placeholder="Type your message...">
<button id="send-button">Send</button>
</div>
<script>
const chatContainer = document.getElementById('chat-container');
const messageInput = document.getElementById('message-input');
const sendButton = document.getElementById('send-button');

// WebSocket connection
const ws = new WebSocket(`ws://${window.location.host}/ws/chat`);

ws.onmessage = function(event) {
// Check if there's an existing bot message being built
const lastMessage = chatContainer.lastElementChild;
if (lastMessage && lastMessage.classList.contains('bot-message') && lastMessage.dataset.streaming === 'true') {
// Append to the existing message
lastMessage.textContent += event.data;
} else {
// Create a new bot message
const botMessage = document.createElement('div');
botMessage.classList.add('message', 'bot-message');
botMessage.dataset.streaming = 'true';
botMessage.textContent = event.data;
chatContainer.appendChild(botMessage);
chatContainer.scrollTop = chatContainer.scrollHeight;
}
};

ws.onclose = function(event) {
console.log('Connection closed');
// Mark the last message as complete if it was streaming
const lastMessage = chatContainer.lastElementChild;
if (lastMessage && lastMessage.classList.contains('bot-message') && lastMessage.dataset.streaming === 'true') {
lastMessage.dataset.streaming = 'false';
}
};

function sendMessage() {
const message = messageInput.value.trim();
if (message) {
// Add user message to chat
const userMessage = document.createElement('div');
userMessage.classList.add('message', 'user-message');
userMessage.textContent = message;
chatContainer.appendChild(userMessage);

// Send message to server
ws.send(message);

// Clear input
messageInput.value = '';

// Scroll to bottom
chatContainer.scrollTop = chatContainer.scrollHeight;
}
}

sendButton.addEventListener('click', sendMessage);
messageInput.addEventListener('keypress', function(event) {
if (event.key === 'Enter') {
sendMessage();
}
});
</script>
</body>
</html>
2 changes: 2 additions & 0 deletions sdk/python/feast/ui_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ def shutdown_event():

@app.get("/registry")
def read_registry():
if registry_proto is None:
return Response(status_code=503) # Service Unavailable
return Response(
content=registry_proto.SerializeToString(),
media_type="application/octet-stream",
Expand Down
Loading