Skip to content
This repository was archived by the owner on Mar 12, 2026. It is now read-only.
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 42 additions & 5 deletions components/object_store/src/disk_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,15 @@ enum Error {
backtrace: Backtrace,
},

#[snafu(display("Access is out of range, range:{range:?}, file_size:{file_size}, last_modified:{last_modified:?}, file:{file}.\nbacktrace:\n{backtrace}"))]
OutOfRange {
range: Range<usize>,
file_size: usize,
file: String,
last_modified: DateTime<Utc>,
backtrace: Backtrace,
},

#[snafu(display(
"Partial write, expect bytes:{expect}, written:{written}.\nbacktrace:\n{backtrace}",
))]
Expand Down Expand Up @@ -745,6 +754,17 @@ impl ObjectStore for DiskCacheStore {
}

async fn get_range(&self, location: &Path, range: Range<usize>) -> Result<Bytes> {
let file_meta = self.fetch_file_meta(location).await?;
ensure!(
file_meta.size >= range.end,
OutOfRange {
range,
file_size: file_meta.size,
last_modified: file_meta.last_modified,
file: location.to_string()
}
);

let PageRangeResult {
aligned_start,
num_pages,
Expand All @@ -756,11 +776,9 @@ impl ObjectStore for DiskCacheStore {
};
assert!(num_pages > 0);

let file_size = self.fetch_file_meta(location).await?.size;

// Fast path for only one page involved.
if num_pages == 1 {
let aligned_end = (aligned_start + self.page_size).min(file_size);
let aligned_end = (aligned_start + self.page_size).min(file_meta.size);
let aligned_range = aligned_start..aligned_end;
let filename = Self::page_cache_name(location, &aligned_range);
let range_in_file = (range.start - aligned_start)..(range.end - aligned_start);
Expand Down Expand Up @@ -790,7 +808,7 @@ impl ObjectStore for DiskCacheStore {
let mut page_start = aligned_start;
let mut page_idx = 0;
while page_idx < num_pages {
let page_end = (page_start + self.page_size).min(file_size);
let page_end = (page_start + self.page_size).min(file_meta.size);
let range_in_file = {
let real_start = page_start.max(range.start);
let real_end = page_end.min(range.end);
Expand Down Expand Up @@ -827,7 +845,7 @@ impl ObjectStore for DiskCacheStore {
for (idx, cache_miss) in paged_bytes.iter().map(|v| v.is_none()).enumerate() {
if cache_miss {
let missing_range_start = aligned_start + idx * self.page_size;
let missing_range_end = (missing_range_start + self.page_size).min(file_size);
let missing_range_end = (missing_range_start + self.page_size).min(file_meta.size);
missing_ranges.push(missing_range_start..missing_range_end);
missing_range_idx.push(idx);
}
Expand Down Expand Up @@ -933,6 +951,25 @@ mod test {
.exists()
}

#[tokio::test]
async fn test_disk_cache_out_of_range() {
let page_size = 16;
// 51 byte
let data = b"a b c d e f g h i j k l m n o p q r s t u v w x y z";
let location = Path::from("out_of_range_test.sst");
let store = prepare_store(page_size, 32, 0).await;
let buf = Bytes::from_static(data);
store.inner.put(&location, buf.clone()).await.unwrap();

// Read one page out of range.
let res = store.inner.get_range(&location, 48..54).await;
assert!(res.is_err());

// Read multiple pages out of range.
let res = store.inner.get_range(&location, 24..54).await;
assert!(res.is_err());
}

#[tokio::test]
async fn test_disk_cache_store_get_range() {
let page_size = 16;
Expand Down