1use std::convert::TryFrom;
5use std::fmt::Debug;
6use std::io::{Read, Seek};
7use crate::block::{BlockIndex, UncompressedBlock};
8use crate::block::chunk::{Chunk, TileCoordinates};
9use crate::error::{Error, Result, u64_to_usize, UnitResult};
10use crate::io::{PeekRead, Tracking};
11use crate::meta::{MetaData, OffsetTables};
12use crate::meta::header::Header;
13
14#[derive(Debug)]
17pub struct Reader<R> {
18 meta_data: MetaData,
19 remaining_reader: PeekRead<Tracking<R>>, }
21
22impl<R: Read + Seek> Reader<R> {
23
24 pub fn read_from_buffered(read: R, pedantic: bool) -> Result<Self> {
28 let mut remaining_reader = PeekRead::new(Tracking::new(read));
29 let meta_data = MetaData::read_validated_from_buffered_peekable(&mut remaining_reader, pedantic)?;
30 Ok(Self { meta_data, remaining_reader })
31 }
32
33 pub fn meta_data(&self) -> &MetaData { &self.meta_data }
36
37 pub fn headers(&self) -> &[Header] { &self.meta_data.headers }
39
40 pub fn into_meta_data(self) -> MetaData { self.meta_data }
42
43 pub fn all_chunks(mut self, pedantic: bool) -> Result<AllChunksReader<R>> {
47 let total_chunk_count = {
48 if pedantic {
49 let offset_tables = MetaData::read_offset_tables(&mut self.remaining_reader, &self.meta_data.headers)?;
50 validate_offset_tables(self.meta_data.headers.as_slice(), &offset_tables, self.remaining_reader.byte_position())?;
51 offset_tables.iter().map(|table| table.len()).sum()
52 }
53 else {
54 MetaData::skip_offset_tables(&mut self.remaining_reader, &self.meta_data.headers)?
55 }
56 };
57
58 Ok(AllChunksReader {
59 meta_data: self.meta_data,
60 remaining_chunks: 0 .. total_chunk_count,
61 remaining_bytes: self.remaining_reader,
62 pedantic
63 })
64 }
65
66 pub fn filter_chunks(mut self, pedantic: bool, mut filter: impl FnMut(&MetaData, TileCoordinates, BlockIndex) -> bool) -> Result<FilteredChunksReader<R>> {
71 let offset_tables = MetaData::read_offset_tables(&mut self.remaining_reader, &self.meta_data.headers)?;
72
73 if pedantic {
75 validate_offset_tables(
76 self.meta_data.headers.as_slice(), &offset_tables,
77 self.remaining_reader.byte_position()
78 )?;
79 }
80
81 let mut filtered_offsets = Vec::with_capacity(
82 (self.meta_data.headers.len() * 32).min(2*2048)
83 );
84
85 for (header_index, header) in self.meta_data.headers.iter().enumerate() { for (block_index, tile) in header.blocks_increasing_y_order().enumerate() { let data_indices = header.get_absolute_block_pixel_coordinates(tile.location)?;
90
91 let block = BlockIndex {
92 layer: header_index,
93 level: tile.location.level_index,
94 pixel_position: data_indices.position.to_usize("data indices start")?,
95 pixel_size: data_indices.size,
96 };
97
98 if filter(&self.meta_data, tile.location, block) {
99 filtered_offsets.push(offset_tables[header_index][block_index]) }
101 };
102 }
103
104 filtered_offsets.sort_unstable(); if pedantic {
107 if filtered_offsets.windows(2).any(|pair| pair[0] == pair[1]) {
109 return Err(Error::invalid("chunk offset table"))
110 }
111 }
112
113 Ok(FilteredChunksReader {
114 meta_data: self.meta_data,
115 expected_filtered_chunk_count: filtered_offsets.len(),
116 remaining_filtered_chunk_indices: filtered_offsets.into_iter(),
117 remaining_bytes: self.remaining_reader
118 })
119 }
120}
121
122
123fn validate_offset_tables(headers: &[Header], offset_tables: &OffsetTables, chunks_start_byte: usize) -> UnitResult {
124 let max_pixel_bytes: usize = headers.iter() .map(|header| header.max_pixel_file_bytes())
126 .sum();
127
128 let end_byte = chunks_start_byte + max_pixel_bytes;
130 let is_invalid = offset_tables.iter().flatten().map(|&u64| u64_to_usize(u64, "chunk start"))
131 .any(|maybe_chunk_start| match maybe_chunk_start {
132 Ok(chunk_start) => chunk_start < chunks_start_byte || chunk_start > end_byte,
133 Err(_) => true
134 });
135
136 if is_invalid { Err(Error::invalid("offset table")) }
137 else { Ok(()) }
138}
139
140
141
142
143#[derive(Debug)]
149pub struct FilteredChunksReader<R> {
150 meta_data: MetaData,
151 expected_filtered_chunk_count: usize,
152 remaining_filtered_chunk_indices: std::vec::IntoIter<u64>,
153 remaining_bytes: PeekRead<Tracking<R>>,
154}
155
156#[derive(Debug)]
162pub struct AllChunksReader<R> {
163 meta_data: MetaData,
164 remaining_chunks: std::ops::Range<usize>,
165 remaining_bytes: PeekRead<Tracking<R>>,
166 pedantic: bool,
167}
168
169#[derive(Debug)]
175pub struct OnProgressChunksReader<R, F> {
176 chunks_reader: R,
177 decoded_chunks: usize,
178 callback: F,
179}
180
181pub trait ChunksReader: Sized + Iterator<Item=Result<Chunk>> + ExactSizeIterator {
187
188 fn meta_data(&self) -> &MetaData;
190
191 fn headers(&self) -> &[Header] { &self.meta_data().headers }
193
194 fn expected_chunk_count(&self) -> usize;
197
198 fn read_next_chunk(&mut self) -> Option<Result<Chunk>> { self.next() }
202
203 fn on_progress<F>(self, on_progress: F) -> OnProgressChunksReader<Self, F> where F: FnMut(f64) {
208 OnProgressChunksReader { chunks_reader: self, callback: on_progress, decoded_chunks: 0 }
209 }
210
211 #[cfg(feature = "rayon")]
212 fn decompress_parallel(
218 self, pedantic: bool,
219 mut insert_block: impl FnMut(&MetaData, UncompressedBlock) -> UnitResult
220 ) -> UnitResult
221 {
222 let mut decompressor = match self.parallel_decompressor(pedantic) {
223 Err(old_self) => return old_self.decompress_sequential(pedantic, insert_block),
224 Ok(decompressor) => decompressor,
225 };
226
227 while let Some(block) = decompressor.next() {
228 insert_block(decompressor.meta_data(), block?)?;
229 }
230
231 debug_assert_eq!(decompressor.len(), 0, "compressed blocks left after decompressing all blocks");
232 Ok(())
233 }
234
235 #[cfg(feature = "rayon")]
236 fn parallel_decompressor(self, pedantic: bool) -> std::result::Result<ParallelBlockDecompressor<Self>, Self> {
242 ParallelBlockDecompressor::new(self, pedantic)
243 }
244
245 fn decompress_sequential(
248 self, pedantic: bool,
249 mut insert_block: impl FnMut(&MetaData, UncompressedBlock) -> UnitResult
250 ) -> UnitResult
251 {
252 let mut decompressor = self.sequential_decompressor(pedantic);
253 while let Some(block) = decompressor.next() {
254 insert_block(decompressor.meta_data(), block?)?;
255 }
256
257 debug_assert_eq!(decompressor.len(), 0, "compressed blocks left after decompressing all blocks");
258 Ok(())
259 }
260
261 fn sequential_decompressor(self, pedantic: bool) -> SequentialBlockDecompressor<Self> {
263 SequentialBlockDecompressor { remaining_chunks_reader: self, pedantic }
264 }
265}
266
267impl<R, F> ChunksReader for OnProgressChunksReader<R, F> where R: ChunksReader, F: FnMut(f64) {
268 fn meta_data(&self) -> &MetaData { self.chunks_reader.meta_data() }
269 fn expected_chunk_count(&self) -> usize { self.chunks_reader.expected_chunk_count() }
270}
271
272impl<R, F> ExactSizeIterator for OnProgressChunksReader<R, F> where R: ChunksReader, F: FnMut(f64) {}
273impl<R, F> Iterator for OnProgressChunksReader<R, F> where R: ChunksReader, F: FnMut(f64) {
274 type Item = Result<Chunk>;
275
276 fn next(&mut self) -> Option<Self::Item> {
277 self.chunks_reader.next().map(|item|{
278 {
279 let total_chunks = self.expected_chunk_count() as f64;
280 let callback = &mut self.callback;
281 callback(self.decoded_chunks as f64 / total_chunks);
282 }
283
284 self.decoded_chunks += 1;
285 item
286 })
287 .or_else(||{
288 debug_assert_eq!(
289 self.decoded_chunks, self.expected_chunk_count(),
290 "chunks reader finished but not all chunks are decompressed"
291 );
292
293 let callback = &mut self.callback;
294 callback(1.0);
295 None
296 })
297 }
298
299 fn size_hint(&self) -> (usize, Option<usize>) {
300 self.chunks_reader.size_hint()
301 }
302}
303
304impl<R: Read + Seek> ChunksReader for AllChunksReader<R> {
305 fn meta_data(&self) -> &MetaData { &self.meta_data }
306 fn expected_chunk_count(&self) -> usize { self.remaining_chunks.end }
307}
308
309impl<R: Read + Seek> ExactSizeIterator for AllChunksReader<R> {}
310impl<R: Read + Seek> Iterator for AllChunksReader<R> {
311 type Item = Result<Chunk>;
312
313 fn next(&mut self) -> Option<Self::Item> {
314 let next_chunk = self.remaining_chunks.next()
316 .map(|_| Chunk::read(&mut self.remaining_bytes, &self.meta_data));
317
318 if self.pedantic && next_chunk.is_none() && self.remaining_bytes.peek_u8().is_ok() {
320 return Some(Err(Error::invalid("end of file expected")));
321 }
322
323 next_chunk
324 }
325
326 fn size_hint(&self) -> (usize, Option<usize>) {
327 (self.remaining_chunks.len(), Some(self.remaining_chunks.len()))
328 }
329}
330
331impl<R: Read + Seek> ChunksReader for FilteredChunksReader<R> {
332 fn meta_data(&self) -> &MetaData { &self.meta_data }
333 fn expected_chunk_count(&self) -> usize { self.expected_filtered_chunk_count }
334}
335
336impl<R: Read + Seek> ExactSizeIterator for FilteredChunksReader<R> {}
337impl<R: Read + Seek> Iterator for FilteredChunksReader<R> {
338 type Item = Result<Chunk>;
339
340 fn next(&mut self) -> Option<Self::Item> {
341 self.remaining_filtered_chunk_indices.next().map(|next_chunk_location|{
343 self.remaining_bytes.skip_to( usize::try_from(next_chunk_location)?
345 )?;
346
347 let meta_data = &self.meta_data;
348 Chunk::read(&mut self.remaining_bytes, meta_data)
349 })
350
351 }
353
354 fn size_hint(&self) -> (usize, Option<usize>) {
355 (self.remaining_filtered_chunk_indices.len(), Some(self.remaining_filtered_chunk_indices.len()))
356 }
357}
358
359#[derive(Debug)]
362pub struct SequentialBlockDecompressor<R: ChunksReader> {
363 remaining_chunks_reader: R,
364 pedantic: bool,
365}
366
367impl<R: ChunksReader> SequentialBlockDecompressor<R> {
368
369 pub fn meta_data(&self) -> &MetaData { self.remaining_chunks_reader.meta_data() }
371
372 pub fn decompress_next_block(&mut self) -> Option<Result<UncompressedBlock>> {
374 self.remaining_chunks_reader.read_next_chunk().map(|compressed_chunk|{
375 UncompressedBlock::decompress_chunk(compressed_chunk?, &self.remaining_chunks_reader.meta_data(), self.pedantic)
376 })
377 }
378}
379
380#[cfg(feature = "rayon")]
381#[derive(Debug)]
387pub struct ParallelBlockDecompressor<R: ChunksReader> {
388 remaining_chunks: R,
389 sender: std::sync::mpsc::Sender<Result<UncompressedBlock>>,
390 receiver: std::sync::mpsc::Receiver<Result<UncompressedBlock>>,
391 currently_decompressing_count: usize,
392 max_threads: usize,
393
394 shared_meta_data_ref: std::sync::Arc<MetaData>,
395 pedantic: bool,
396
397 pool: rayon_core::ThreadPool,
398}
399
400#[cfg(feature = "rayon")]
401impl<R: ChunksReader> ParallelBlockDecompressor<R> {
402
403 pub fn new(chunks: R, pedantic: bool) -> std::result::Result<Self, R> {
408 Self::new_with_thread_pool(chunks, pedantic, ||{
409 rayon_core::ThreadPoolBuilder::new()
410 .thread_name(|index| format!("OpenEXR Block Decompressor Thread #{}", index))
411 .build()
412 })
413 }
414
415 pub fn new_with_thread_pool<CreatePool>(chunks: R, pedantic: bool, try_create_thread_pool: CreatePool)
419 -> std::result::Result<Self, R>
420 where CreatePool: FnOnce() -> std::result::Result<rayon_core::ThreadPool, rayon_core::ThreadPoolBuildError>
421 {
422 use crate::compression::Compression;
423
424 let is_entirely_uncompressed = chunks.meta_data().headers.iter()
425 .all(|head|head.compression == Compression::Uncompressed);
426
427 if is_entirely_uncompressed {
429 return Err(chunks);
430 }
431
432 let pool = match try_create_thread_pool() {
435 Ok(pool) => pool,
436
437 Err(_) => return Err(chunks),
439 };
440
441 let max_threads = pool.current_num_threads().max(1).min(chunks.len()) + 2; let (send, recv) = std::sync::mpsc::channel(); Ok(Self {
446 shared_meta_data_ref: std::sync::Arc::new(chunks.meta_data().clone()),
447 currently_decompressing_count: 0,
448 remaining_chunks: chunks,
449 sender: send,
450 receiver: recv,
451 pedantic,
452 max_threads,
453
454 pool,
455 })
456 }
457
458 pub fn decompress_next_block(&mut self) -> Option<Result<UncompressedBlock>> {
460
461 while self.currently_decompressing_count < self.max_threads {
462 let block = self.remaining_chunks.next();
463 if let Some(block) = block {
464 let block = match block {
465 Ok(block) => block,
466 Err(error) => return Some(Err(error))
467 };
468
469 let sender = self.sender.clone();
470 let meta = self.shared_meta_data_ref.clone();
471 let pedantic = self.pedantic;
472
473 self.currently_decompressing_count += 1;
474
475 self.pool.spawn(move || {
476 let decompressed_or_err = UncompressedBlock::decompress_chunk(
477 block, &meta, pedantic
478 );
479
480 let _ = sender.send(decompressed_or_err);
484 });
485 }
486 else {
487 break;
489 }
490 }
491
492 if self.currently_decompressing_count > 0 {
493 let next = self.receiver.recv()
494 .expect("all decompressing senders hung up but more messages were expected");
495
496 self.currently_decompressing_count -= 1;
497 Some(next)
498 }
499 else {
500 debug_assert!(self.receiver.try_recv().is_err(), "uncompressed chunks left in channel after decompressing all chunks"); debug_assert_eq!(self.len(), 0, "compressed chunks left after decompressing all chunks");
502 None
503 }
504 }
505
506 pub fn meta_data(&self) -> &MetaData { self.remaining_chunks.meta_data() }
508}
509
510impl<R: ChunksReader> ExactSizeIterator for SequentialBlockDecompressor<R> {}
511impl<R: ChunksReader> Iterator for SequentialBlockDecompressor<R> {
512 type Item = Result<UncompressedBlock>;
513 fn next(&mut self) -> Option<Self::Item> { self.decompress_next_block() }
514 fn size_hint(&self) -> (usize, Option<usize>) { self.remaining_chunks_reader.size_hint() }
515}
516
517#[cfg(feature = "rayon")]
518impl<R: ChunksReader> ExactSizeIterator for ParallelBlockDecompressor<R> {}
519#[cfg(feature = "rayon")]
520impl<R: ChunksReader> Iterator for ParallelBlockDecompressor<R> {
521 type Item = Result<UncompressedBlock>;
522 fn next(&mut self) -> Option<Self::Item> { self.decompress_next_block() }
523 fn size_hint(&self) -> (usize, Option<usize>) {
524 let remaining = self.remaining_chunks.len() + self.currently_decompressing_count;
525 (remaining, Some(remaining))
526 }
527}
528
529
530
531
532