1use polars_core::prelude::*;
2use polars_io::cloud::CloudOptions;
3use polars_io::csv::read::{
4 CommentPrefix, CsvEncoding, CsvParseOptions, CsvReadOptions, NullValues, infer_file_schema,
5};
6use polars_io::path_utils::expand_paths;
7use polars_io::utils::compression::maybe_decompress_bytes;
8use polars_io::utils::get_reader_bytes;
9use polars_io::{HiveOptions, RowIndex};
10use polars_utils::mmap::MemSlice;
11use polars_utils::plpath::PlPath;
12use polars_utils::slice_enum::Slice;
13
14use crate::prelude::*;
15
16#[derive(Clone)]
17#[cfg(feature = "csv")]
18pub struct LazyCsvReader {
19 sources: ScanSources,
20 glob: bool,
21 cache: bool,
22 read_options: CsvReadOptions,
23 cloud_options: Option<CloudOptions>,
24 include_file_paths: Option<PlSmallStr>,
25}
26
27#[cfg(feature = "csv")]
28impl LazyCsvReader {
29 pub fn map_parse_options<F: Fn(CsvParseOptions) -> CsvParseOptions>(
31 mut self,
32 map_func: F,
33 ) -> Self {
34 self.read_options = self.read_options.map_parse_options(map_func);
35 self
36 }
37
38 pub fn new_paths(paths: Arc<[PlPath]>) -> Self {
39 Self::new_with_sources(ScanSources::Paths(paths))
40 }
41
42 pub fn new_with_sources(sources: ScanSources) -> Self {
43 LazyCsvReader {
44 sources,
45 glob: true,
46 cache: true,
47 read_options: Default::default(),
48 cloud_options: Default::default(),
49 include_file_paths: None,
50 }
51 }
52
53 pub fn new(path: PlPath) -> Self {
54 Self::new_with_sources(ScanSources::Paths([path].into()))
55 }
56
57 #[must_use]
59 pub fn with_skip_rows_after_header(mut self, offset: usize) -> Self {
60 self.read_options.skip_rows_after_header = offset;
61 self
62 }
63
64 #[must_use]
66 pub fn with_row_index(mut self, row_index: Option<RowIndex>) -> Self {
67 self.read_options.row_index = row_index;
68 self
69 }
70
71 #[must_use]
74 pub fn with_n_rows(mut self, num_rows: Option<usize>) -> Self {
75 self.read_options.n_rows = num_rows;
76 self
77 }
78
79 #[must_use]
83 pub fn with_infer_schema_length(mut self, num_rows: Option<usize>) -> Self {
84 self.read_options.infer_schema_length = num_rows;
85 self
86 }
87
88 #[must_use]
90 pub fn with_ignore_errors(mut self, ignore: bool) -> Self {
91 self.read_options.ignore_errors = ignore;
92 self
93 }
94
95 #[must_use]
97 pub fn with_schema(mut self, schema: Option<SchemaRef>) -> Self {
98 self.read_options.schema = schema;
99 self
100 }
101
102 #[must_use]
105 pub fn with_skip_rows(mut self, skip_rows: usize) -> Self {
106 self.read_options.skip_rows = skip_rows;
107 self
108 }
109
110 #[must_use]
113 pub fn with_skip_lines(mut self, skip_lines: usize) -> Self {
114 self.read_options.skip_lines = skip_lines;
115 self
116 }
117
118 #[must_use]
121 pub fn with_dtype_overwrite(mut self, schema: Option<SchemaRef>) -> Self {
122 self.read_options.schema_overwrite = schema;
123 self
124 }
125
126 #[must_use]
128 pub fn with_has_header(mut self, has_header: bool) -> Self {
129 self.read_options.has_header = has_header;
130 self
131 }
132
133 pub fn with_chunk_size(mut self, chunk_size: usize) -> Self {
136 self.read_options.chunk_size = chunk_size;
137 self
138 }
139
140 #[must_use]
142 pub fn with_separator(self, separator: u8) -> Self {
143 self.map_parse_options(|opts| opts.with_separator(separator))
144 }
145
146 #[must_use]
148 pub fn with_comment_prefix(self, comment_prefix: Option<PlSmallStr>) -> Self {
149 self.map_parse_options(|opts| {
150 opts.with_comment_prefix(comment_prefix.clone().map(|s| {
151 if s.len() == 1 && s.chars().next().unwrap().is_ascii() {
152 CommentPrefix::Single(s.as_bytes()[0])
153 } else {
154 CommentPrefix::Multi(s)
155 }
156 }))
157 })
158 }
159
160 #[must_use]
162 pub fn with_quote_char(self, quote_char: Option<u8>) -> Self {
163 self.map_parse_options(|opts| opts.with_quote_char(quote_char))
164 }
165
166 #[must_use]
168 pub fn with_eol_char(self, eol_char: u8) -> Self {
169 self.map_parse_options(|opts| opts.with_eol_char(eol_char))
170 }
171
172 #[must_use]
174 pub fn with_null_values(self, null_values: Option<NullValues>) -> Self {
175 self.map_parse_options(|opts| opts.with_null_values(null_values.clone()))
176 }
177
178 pub fn with_missing_is_null(self, missing_is_null: bool) -> Self {
180 self.map_parse_options(|opts| opts.with_missing_is_null(missing_is_null))
181 }
182
183 #[must_use]
185 pub fn with_cache(mut self, cache: bool) -> Self {
186 self.cache = cache;
187 self
188 }
189
190 #[must_use]
192 pub fn with_low_memory(mut self, low_memory: bool) -> Self {
193 self.read_options.low_memory = low_memory;
194 self
195 }
196
197 #[must_use]
199 pub fn with_encoding(self, encoding: CsvEncoding) -> Self {
200 self.map_parse_options(|opts| opts.with_encoding(encoding))
201 }
202
203 #[cfg(feature = "temporal")]
206 pub fn with_try_parse_dates(self, try_parse_dates: bool) -> Self {
207 self.map_parse_options(|opts| opts.with_try_parse_dates(try_parse_dates))
208 }
209
210 #[must_use]
212 pub fn with_raise_if_empty(mut self, raise_if_empty: bool) -> Self {
213 self.read_options.raise_if_empty = raise_if_empty;
214 self
215 }
216
217 #[must_use]
219 pub fn with_truncate_ragged_lines(self, truncate_ragged_lines: bool) -> Self {
220 self.map_parse_options(|opts| opts.with_truncate_ragged_lines(truncate_ragged_lines))
221 }
222
223 #[must_use]
224 pub fn with_decimal_comma(self, decimal_comma: bool) -> Self {
225 self.map_parse_options(|opts| opts.with_decimal_comma(decimal_comma))
226 }
227
228 #[must_use]
229 pub fn with_glob(mut self, toggle: bool) -> Self {
231 self.glob = toggle;
232 self
233 }
234
235 pub fn with_cloud_options(mut self, cloud_options: Option<CloudOptions>) -> Self {
236 self.cloud_options = cloud_options;
237 self
238 }
239
240 pub fn with_schema_modify<F>(mut self, f: F) -> PolarsResult<Self>
244 where
245 F: Fn(Schema) -> PolarsResult<Schema>,
246 {
247 let n_threads = self.read_options.n_threads;
248
249 let infer_schema = |bytes: MemSlice| {
250 let skip_rows = self.read_options.skip_rows;
251 let skip_lines = self.read_options.skip_lines;
252 let parse_options = self.read_options.get_parse_options();
253
254 let mut owned = vec![];
255 let bytes = maybe_decompress_bytes(bytes.as_ref(), &mut owned)?;
256
257 PolarsResult::Ok(
258 infer_file_schema(
259 &get_reader_bytes(&mut std::io::Cursor::new(bytes))?,
260 &parse_options,
261 self.read_options.infer_schema_length,
262 self.read_options.has_header,
263 None,
265 skip_rows,
266 skip_lines,
267 self.read_options.skip_rows_after_header,
268 self.read_options.raise_if_empty,
269 )?
270 .0,
271 )
272 };
273
274 let schema = match self.sources.clone() {
275 ScanSources::Paths(paths) => {
276 let paths = expand_paths(&paths[..], self.glob(), self.cloud_options())?;
279
280 let Some(path) = paths.first() else {
281 polars_bail!(ComputeError: "no paths specified for this reader");
282 };
283
284 infer_schema(MemSlice::from_file(&polars_utils::open_file(
285 path.as_ref().as_local_path().unwrap(),
286 )?)?)?
287 },
288 ScanSources::Files(files) => {
289 let Some(file) = files.first() else {
290 polars_bail!(ComputeError: "no buffers specified for this reader");
291 };
292
293 infer_schema(MemSlice::from_file(file)?)?
294 },
295 ScanSources::Buffers(buffers) => {
296 let Some(buffer) = buffers.first() else {
297 polars_bail!(ComputeError: "no buffers specified for this reader");
298 };
299
300 infer_schema(buffer.clone())?
301 },
302 };
303
304 self.read_options.n_threads = n_threads;
305 let mut schema = f(schema)?;
306
307 if let Some(overwrite_schema) = &self.read_options.schema_overwrite {
309 for (name, dtype) in overwrite_schema.iter() {
310 schema.with_column(name.clone(), dtype.clone());
311 }
312 }
313
314 Ok(self.with_schema(Some(Arc::new(schema))))
315 }
316
317 pub fn with_include_file_paths(mut self, include_file_paths: Option<PlSmallStr>) -> Self {
318 self.include_file_paths = include_file_paths;
319 self
320 }
321}
322
323impl LazyFileListReader for LazyCsvReader {
324 fn finish(self) -> PolarsResult<LazyFrame> {
326 let rechunk = self.rechunk();
327 let row_index = self.row_index().cloned();
328 let pre_slice = self.n_rows().map(|len| Slice::Positive { offset: 0, len });
329
330 let lf: LazyFrame = DslBuilder::scan_csv(
331 self.sources,
332 self.read_options,
333 UnifiedScanArgs {
334 schema: None,
335 cloud_options: self.cloud_options,
336 hive_options: HiveOptions::new_disabled(),
337 rechunk,
338 cache: self.cache,
339 glob: self.glob,
340 projection: None,
341 row_index,
342 pre_slice,
343 cast_columns_policy: CastColumnsPolicy::ERROR_ON_MISMATCH,
344 missing_columns_policy: MissingColumnsPolicy::Raise,
345 extra_columns_policy: ExtraColumnsPolicy::Raise,
346 include_file_paths: self.include_file_paths,
347 column_mapping: None,
348 deletion_files: None,
349 },
350 )?
351 .build()
352 .into();
353 Ok(lf)
354 }
355
356 fn finish_no_glob(self) -> PolarsResult<LazyFrame> {
357 unreachable!();
358 }
359
360 fn glob(&self) -> bool {
361 self.glob
362 }
363
364 fn sources(&self) -> &ScanSources {
365 &self.sources
366 }
367
368 fn with_sources(mut self, sources: ScanSources) -> Self {
369 self.sources = sources;
370 self
371 }
372
373 fn with_n_rows(mut self, n_rows: impl Into<Option<usize>>) -> Self {
374 self.read_options.n_rows = n_rows.into();
375 self
376 }
377
378 fn with_row_index(mut self, row_index: impl Into<Option<RowIndex>>) -> Self {
379 self.read_options.row_index = row_index.into();
380 self
381 }
382
383 fn rechunk(&self) -> bool {
384 self.read_options.rechunk
385 }
386
387 fn with_rechunk(mut self, rechunk: bool) -> Self {
389 self.read_options.rechunk = rechunk;
390 self
391 }
392
393 fn n_rows(&self) -> Option<usize> {
396 self.read_options.n_rows
397 }
398
399 fn row_index(&self) -> Option<&RowIndex> {
401 self.read_options.row_index.as_ref()
402 }
403
404 fn concat_impl(&self, lfs: Vec<LazyFrame>) -> PolarsResult<LazyFrame> {
405 let args = UnionArgs {
407 rechunk: self.rechunk(),
408 parallel: false,
409 to_supertypes: false,
410 from_partitioned_ds: true,
411 ..Default::default()
412 };
413 concat_impl(&lfs, args)
414 }
415
416 fn cloud_options(&self) -> Option<&CloudOptions> {
418 self.cloud_options.as_ref()
419 }
420}