2 releases (1 stable)
| 2026.1.1 | Jan 17, 2026 |
|---|---|
| 0.1.0 | Jan 19, 2026 |
#197 in Science
185KB
3.5K
SLoC
graph-sp
A high-performance DAG (Directed Acyclic Graph) execution engine with true parallel execution, built in Rust with Python bindings.
Features
- ⚡ True Parallel Execution: Automatic parallelization of independent nodes (44% faster for fan-out patterns)
- 🔌 Port-based Architecture: Type-safe data flow between nodes via named ports
- 🌿 Branching & Nested Graphs: Create isolated subgraphs for experiments and variants
- 🔀 Merge Operations: Combine outputs from multiple branches with custom merge functions
- 🔄 Variants & Config Sweeps: Automated parameter variation with cartesian product support
- 🐍 Python & Rust APIs: Full feature parity across both languages
- 🔍 Graph Inspection: Analysis, visualization, and Mermaid diagram generation
- ✅ Cycle Detection: Built-in DAG validation with detailed error reporting
- 📊 Rich Data Types: Primitives, collections, JSON, nested objects, and binary data
- 🎯 Zero-Copy Optimization: Efficient data sharing using Arc
Quick Start
Python
Install from PyPI:
pip install pygraph-sp
Simple example:
import pygraphsp as gs
# Create a graph
graph = gs.Graph()
# Add nodes with Python functions using the simplified API
def data_source(inputs):
return {"output": [1, 2, 3, 4, 5]}
def multiply_by_2(inputs):
return {"output": [x * 2 for x in inputs["input"]]}
# Add nodes - function name becomes the node ID by default
graph.add(
data_source,
label="Data Source",
outputs=["output"]
)
graph.add(
multiply_by_2,
label="Multiply by 2",
inputs=["input"],
outputs=["output"]
)
# Connect nodes using function names
graph.add_edge("data_source", "output", "multiply_by_2", "input")
# Execute with parallel processing
executor = gs.Executor()
result = executor.execute(graph)
print(result.get_output("multiply_by_2", "output")) # [2, 4, 6, 8, 10]
Rust
Add to your Cargo.toml:
[dependencies]
graph-sp = "0.1"
tokio = { version = "1", features = ["full"] }
Simple example:
use graph_sp::{Graph, Node, NodeConfig, Port, PortData, Edge, Executor};
use std::collections::HashMap;
use std::sync::Arc;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut graph = Graph::new();
// Add source node
let source = NodeConfig::new(
"source", "Data Source",
vec![],
vec![Port::simple("output")],
Arc::new(|_: &HashMap<String, PortData>| {
Ok(HashMap::from([
("output".to_string(), PortData::List(vec![
PortData::Int(1), PortData::Int(2), PortData::Int(3)
]))
]))
}),
);
graph.add(Node::new(source))?;
// Execute
let executor = Executor::new();
let result = executor.execute(&mut graph).await?;
Ok(())
}
Installation
Python
From PyPI (recommended):
pip install pygraph-sp
From source:
# Clone the repository
git clone https://github.com/briday1/graph-sp.git
cd graph-sp
# Install maturin
pip install maturin
# Build and install
maturin develop --release --features python
Rust
From crates.io:
[dependencies]
graph-sp = "0.1"
From source:
git clone https://github.com/briday1/graph-sp.git
cd graph-sp
cargo build --release
Core Concepts
Ports
Nodes communicate through typed ports with separate broadcast and implementation names:
# Python - Simple ports (broadcast_name == impl_name)
graph.add(
my_function,
inputs=["data", "config"],
outputs=["result"]
)
# Advanced: Separate broadcast and implementation names
# Useful for connecting nodes with different parameter names
graph.add(
process_data,
inputs=[("external_data", "internal_param")], # (broadcast, impl)
outputs=[("result", "output_value")]
)
# Most explicit: Use Port objects
input_port = gs.Port("broadcast_name", "impl_name", "Display Name")
graph.add(my_function, inputs=[input_port], outputs=[...])
// Rust - Simple port (both names the same)
let port = Port::simple("data");
// Separate broadcast and implementation names
let port = Port::new("external_data", "internal_param");
Port Name Types:
- broadcast_name: External name used for connecting nodes via edges
- impl_name: Internal parameter name used in function signatures
- display_name: Human-readable name for visualizations (defaults to broadcast_name)
Data Types
Support for multiple data types:
- Primitives: Int, Float, String, Bool, None
- Collections: List, Map (nested HashMap)
- Structured: JSON (arbitrary structures)
- Binary: Bytes (raw binary data)
# Python - nested objects work seamlessly
user = {
"name": "Alice",
"age": 30,
"address": {
"city": "NYC",
"zip": "10001"
},
"hobbies": ["reading", "coding"]
}
Parallel Execution
The engine automatically parallelizes independent branches:
# This creates a fan-out pattern with 3 parallel branches
# source
# / | \
# slow fast medium <- These run in parallel!
# \ | /
# merger
# Execution time: ~500ms (parallel) vs ~900ms (sequential)
# 44% speedup achieved automatically!
Graph Inspection
# Analyze graph structure
analysis = graph.analyze()
print(f"Nodes: {analysis.node_count}")
print(f"Depth: {analysis.depth}")
print(f"Width: {analysis.width}") # Parallelization potential
# Generate Mermaid diagram
mermaid = graph.to_mermaid()
print(mermaid) # GitHub-compatible markdown
Branching & Variants (New in v0.2.0)
Branches allow you to create isolated subgraphs:
# Create experimental branches
graph.create_branch("experiment_a")
graph.create_branch("experiment_b")
# Check branches
print(graph.branch_names()) # ["experiment_a", "experiment_b"]
print(graph.has_branch("experiment_a")) # True
Variants enable config sweeps and hyperparameter tuning:
# Python - Create variants with different learning rates
def learning_rate_gen(i):
rates = [0.001, 0.01, 0.1]
return gs.PortData(rates[i])
branches = graph.create_variants(
name_prefix="lr",
count=3,
param_name="learning_rate",
variant_function=learning_rate_gen,
parallel=True # Enable parallel execution
)
# Creates: lr_0 (0.001), lr_1 (0.01), lr_2 (0.1)
// Rust - Same functionality
use graph_sp::core::{VariantConfig, VariantFunction};
let variant_fn: VariantFunction = Arc::new(|i: usize| {
PortData::Float((i as f64 + 1.0) * 0.01)
});
let config = VariantConfig::new("lr", 3, "learning_rate", variant_fn);
let branches = graph.create_variants(config)?;
Merge combines outputs from multiple branches:
# Python - Merge outputs from variant branches
graph.merge_branches(
node_id="select_best",
branches=variant_branches,
port="accuracy" # Merges to "accuracies" (pluralized)
)
# Custom merge function (e.g., average)
def compute_average(values):
return gs.PortData(sum(values) / len(values))
graph.merge_branches(
node_id="summarize",
branches=variant_branches,
port="score",
merge_function=compute_average
)
// Rust - Same functionality
use graph_sp::core::MergeConfig;
let merge_config = MergeConfig::new(
vec!["branch_a".to_string(), "branch_b".to_string()],
"result".to_string()
);
graph.merge("merge_node", merge_config)?;
// Custom merge function
let max_fn = Arc::new(|inputs: Vec<&PortData>| -> Result<PortData> {
let max = inputs.iter()
.filter_map(|d| if let PortData::Int(v) = d { Some(*v) } else { None })
.max()
.unwrap_or(0);
Ok(PortData::Int(max))
});
let config = MergeConfig::new(branches, "score".to_string())
.with_merge_fn(max_fn);
Nested Variants create cartesian products:
// 2 learning rates × 3 batch sizes = 6 total configurations
let lr_config = VariantConfig::new("lr", 2, "learning_rate", lr_fn);
let lr_branches = graph.create_variants(lr_config)?;
for lr_branch in &lr_branches {
let branch = graph.get_branch_mut(lr_branch)?;
let batch_config = VariantConfig::new("batch", 3, "batch_size", batch_fn);
branch.create_variants(batch_config)?;
}
Examples
Python Examples
Located in python_examples/:
- simple_pipeline.py: Basic 3-node pipeline with implicit port mapping
- parallel_execution.py: Fan-out/fan-in pattern with 3 parallel branches
- variants_demo.py: Hyperparameter sweeps, custom merge, nested variants
- port_mapping_demo.py: ⭐ Explicit port mapping (broadcast vs implementation names)
- complex_objects.py: Nested objects, JSON, and lists
- branching_example.py: Branch creation and management
- All other examples: Use simple implicit port mapping
Note: Most examples use implicit port mapping where port names and function parameters match.
See port_mapping_demo.py for the one example showing explicit broadcast/implementation name separation.
Run an example:
cd python_examples
python simple_pipeline.py
python variants_demo.py
Rust Examples
Located in examples/:
- simple_pipeline.rs: 4-node data processing pipeline with implicit ports
- parallel_execution.rs: Fan-out/fan-in pattern with performance analysis
- branching_and_variants.rs: Comprehensive demo of branches, merge, and variants
- all_features_demo.rs: ⭐ Shows both implicit and explicit port mapping
- complex_objects.rs: All PortData types with nested structures
Note: Most examples use Port::simple("name") for implicit mapping.
See all_features_demo.rs for examples using Port::new("broadcast", "impl") for explicit mapping.
Run an example:
cargo run --example simple_pipeline
cargo run --example parallel_execution
cargo run --example branching_and_variants
Performance
Measured with 3-branch parallel execution example:
| Metric | Sequential | Parallel | Improvement |
|---|---|---|---|
| Rust | 900ms | 500ms | 44% faster |
| Python | 900ms | 502ms | 44% faster |
The executor identifies dependency levels and executes all independent nodes concurrently using tokio::task::spawn_blocking.
Architecture
Core Components
-
Data Model (
src/core/data.rs)PortData: Enum for all supported typesGraphData: HashMap for port-to-port storage- Support for nested structures via recursive variants
-
Graph (
src/core/graph.rs)- petgraph-backed DAG representation
- Topological sorting and cycle detection
- Type-safe port connections
-
Executor (
src/executor/mod.rs)- Dependency-level grouping
- Concurrent execution with
tokio::task::spawn_blocking - Automatic concurrency management
-
Inspector (
src/inspector/mod.rs)- Graph statistics (depth, width, sources, sinks)
- Mermaid diagram generation
- Optimization suggestions
-
Python Bindings (
src/python/mod.rs)- PyO3-based wrappers
- GIL-aware parallel execution
- Automatic type conversion
Building & Testing
Rust
# Run tests
cargo test
# Run with all features
cargo test --all-features
# Build release
cargo build --release
# Run examples
cargo run --example simple_pipeline
Python
# Install development dependencies
pip install maturin pytest
# Build Python bindings
maturin develop --features python
# Run Python examples
python python_examples/simple_pipeline.py
Build for Multiple Platforms
# Linux (using Docker for manylinux compatibility)
docker run --rm -v $(pwd):/io ghcr.io/pyo3/maturin build --release --features python
# macOS
maturin build --release --features python
# Windows
maturin build --release --features python
Documentation
- Python API: See
python_examples/README.mdfor detailed Python usage - Rust API: Run
cargo doc --openfor full API documentation - Port Data Types: See
docs/PORT_DATA_TYPES.mdfor supported types
Publishing
PyPI
Wheels are automatically built and published to PyPI on version tags:
git tag v0.1.0
git push origin v0.1.0
Builds wheels for:
- Linux (manylinux)
- macOS (Intel & ARM)
- Windows
Crates.io
cargo publish
Contributing
Contributions welcome! Please:
- Fork the repository
- Create a feature branch
- Add tests for new functionality
- Ensure all tests pass (
cargo test --all-features) - Submit a pull request
License
MIT License - see LICENSE file for details
Roadmap
- True parallel execution
- Python bindings with PyPI distribution
- Mermaid diagram generation
- Comprehensive examples
- Distributed execution support
- Graph serialization/deserialization
- WebAssembly support
- Real-time monitoring dashboard
- Advanced optimization algorithms
Dependencies
~5.5–9MB
~151K SLoC