exr/block/
reader.rs

1//! Composable structures to handle reading an image.
2
3
4use std::convert::TryFrom;
5use std::fmt::Debug;
6use std::io::{Read, Seek};
7use crate::block::{BlockIndex, UncompressedBlock};
8use crate::block::chunk::{Chunk, TileCoordinates};
9use crate::error::{Error, Result, u64_to_usize, UnitResult};
10use crate::io::{PeekRead, Tracking};
11use crate::meta::{MetaData, OffsetTables};
12use crate::meta::header::Header;
13
14/// Decode the meta data from a byte source, keeping the source ready for further reading.
15/// Continue decoding the remaining bytes by calling `filtered_chunks` or `all_chunks`.
16#[derive(Debug)]
17pub struct Reader<R> {
18    meta_data: MetaData,
19    remaining_reader: PeekRead<Tracking<R>>, // TODO does R need to be Seek or is Tracking enough?
20}
21
22impl<R: Read + Seek> Reader<R> {
23
24    /// Start the reading process.
25    /// Immediately decodes the meta data into an internal field.
26    /// Access it via`meta_data()`.
27    pub fn read_from_buffered(read: R, pedantic: bool) -> Result<Self> {
28        let mut remaining_reader = PeekRead::new(Tracking::new(read));
29        let meta_data = MetaData::read_validated_from_buffered_peekable(&mut remaining_reader, pedantic)?;
30        Ok(Self { meta_data, remaining_reader })
31    }
32
33    // must not be mutable, as reading the file later on relies on the meta data
34    /// The decoded exr meta data from the file.
35    pub fn meta_data(&self) -> &MetaData { &self.meta_data }
36
37    /// The decoded exr meta data from the file.
38    pub fn headers(&self) -> &[Header] { &self.meta_data.headers }
39
40    /// Obtain the meta data ownership.
41    pub fn into_meta_data(self) -> MetaData { self.meta_data }
42
43    /// Prepare to read all the chunks from the file.
44    /// Does not decode the chunks now, but returns a decoder.
45    /// Reading all chunks reduces seeking the file, but some chunks might be read without being used.
46    pub fn all_chunks(mut self, pedantic: bool) -> Result<AllChunksReader<R>> {
47        let total_chunk_count = {
48            if pedantic {
49                let offset_tables = MetaData::read_offset_tables(&mut self.remaining_reader, &self.meta_data.headers)?;
50                validate_offset_tables(self.meta_data.headers.as_slice(), &offset_tables, self.remaining_reader.byte_position())?;
51                offset_tables.iter().map(|table| table.len()).sum()
52            }
53            else {
54                MetaData::skip_offset_tables(&mut self.remaining_reader, &self.meta_data.headers)?
55            }
56        };
57
58        Ok(AllChunksReader {
59            meta_data: self.meta_data,
60            remaining_chunks: 0 .. total_chunk_count,
61            remaining_bytes: self.remaining_reader,
62            pedantic
63        })
64    }
65
66    /// Prepare to read some the chunks from the file.
67    /// Does not decode the chunks now, but returns a decoder.
68    /// Reading only some chunks may seeking the file, potentially skipping many bytes.
69    // TODO tile indices add no new information to block index??
70    pub fn filter_chunks(mut self, pedantic: bool, mut filter: impl FnMut(&MetaData, TileCoordinates, BlockIndex) -> bool) -> Result<FilteredChunksReader<R>> {
71        let offset_tables = MetaData::read_offset_tables(&mut self.remaining_reader, &self.meta_data.headers)?;
72
73        // TODO regardless of pedantic, if invalid, read all chunks instead, and filter after reading each chunk?
74        if pedantic {
75            validate_offset_tables(
76                self.meta_data.headers.as_slice(), &offset_tables,
77                self.remaining_reader.byte_position()
78            )?;
79        }
80
81        let mut filtered_offsets = Vec::with_capacity(
82            (self.meta_data.headers.len() * 32).min(2*2048)
83        );
84
85        // TODO detect whether the filter actually would skip chunks, and aviod sorting etc when not filtering is applied
86
87        for (header_index, header) in self.meta_data.headers.iter().enumerate() { // offset tables are stored same order as headers
88            for (block_index, tile) in header.blocks_increasing_y_order().enumerate() { // in increasing_y order
89                let data_indices = header.get_absolute_block_pixel_coordinates(tile.location)?;
90
91                let block = BlockIndex {
92                    layer: header_index,
93                    level: tile.location.level_index,
94                    pixel_position: data_indices.position.to_usize("data indices start")?,
95                    pixel_size: data_indices.size,
96                };
97
98                if filter(&self.meta_data, tile.location, block) {
99                    filtered_offsets.push(offset_tables[header_index][block_index]) // safe indexing from `enumerate()`
100                }
101            };
102        }
103
104        filtered_offsets.sort_unstable(); // enables reading continuously if possible (already sorted where line order increasing)
105
106        if pedantic {
107            // table is sorted. if any two neighbours are equal, we have duplicates. this is invalid.
108            if filtered_offsets.windows(2).any(|pair| pair[0] == pair[1]) {
109                return Err(Error::invalid("chunk offset table"))
110            }
111        }
112
113        Ok(FilteredChunksReader {
114            meta_data: self.meta_data,
115            expected_filtered_chunk_count: filtered_offsets.len(),
116            remaining_filtered_chunk_indices: filtered_offsets.into_iter(),
117            remaining_bytes: self.remaining_reader
118        })
119    }
120}
121
122
123fn validate_offset_tables(headers: &[Header], offset_tables: &OffsetTables, chunks_start_byte: usize) -> UnitResult {
124    let max_pixel_bytes: usize = headers.iter() // when compressed, chunks are smaller, but never larger than max
125        .map(|header| header.max_pixel_file_bytes())
126        .sum();
127
128    // check that each offset is within the bounds
129    let end_byte = chunks_start_byte + max_pixel_bytes;
130    let is_invalid = offset_tables.iter().flatten().map(|&u64| u64_to_usize(u64, "chunk start"))
131        .any(|maybe_chunk_start| match maybe_chunk_start {
132            Ok(chunk_start) => chunk_start < chunks_start_byte || chunk_start > end_byte,
133            Err(_) => true
134        });
135
136    if is_invalid { Err(Error::invalid("offset table")) }
137    else { Ok(()) }
138}
139
140
141
142
143/// Decode the desired chunks and skip the unimportant chunks in the file.
144/// The decoded chunks can be decompressed by calling
145/// `decompress_parallel`, `decompress_sequential`, or `sequential_decompressor` or `parallel_decompressor`.
146/// Call `on_progress` to have a callback with each block.
147/// Also contains the image meta data.
148#[derive(Debug)]
149pub struct FilteredChunksReader<R> {
150    meta_data: MetaData,
151    expected_filtered_chunk_count: usize,
152    remaining_filtered_chunk_indices: std::vec::IntoIter<u64>,
153    remaining_bytes: PeekRead<Tracking<R>>,
154}
155
156/// Decode all chunks in the file without seeking.
157/// The decoded chunks can be decompressed by calling
158/// `decompress_parallel`, `decompress_sequential`, or `sequential_decompressor` or `parallel_decompressor`.
159/// Call `on_progress` to have a callback with each block.
160/// Also contains the image meta data.
161#[derive(Debug)]
162pub struct AllChunksReader<R> {
163    meta_data: MetaData,
164    remaining_chunks: std::ops::Range<usize>,
165    remaining_bytes: PeekRead<Tracking<R>>,
166    pedantic: bool,
167}
168
169/// Decode chunks in the file without seeking.
170/// Calls the supplied closure for each chunk.
171/// The decoded chunks can be decompressed by calling
172/// `decompress_parallel`, `decompress_sequential`, or `sequential_decompressor`.
173/// Also contains the image meta data.
174#[derive(Debug)]
175pub struct OnProgressChunksReader<R, F> {
176    chunks_reader: R,
177    decoded_chunks: usize,
178    callback: F,
179}
180
181/// Decode chunks in the file.
182/// The decoded chunks can be decompressed by calling
183/// `decompress_parallel`, `decompress_sequential`, or `sequential_decompressor`.
184/// Call `on_progress` to have a callback with each block.
185/// Also contains the image meta data.
186pub trait ChunksReader: Sized + Iterator<Item=Result<Chunk>> + ExactSizeIterator {
187
188    /// The decoded exr meta data from the file.
189    fn meta_data(&self) -> &MetaData;
190
191    /// The decoded exr headers from the file.
192    fn headers(&self) -> &[Header] { &self.meta_data().headers }
193
194    /// The number of chunks that this reader will return in total.
195    /// Can be less than the total number of chunks in the file, if some chunks are skipped.
196    fn expected_chunk_count(&self) -> usize;
197
198    /// Read the next compressed chunk from the file.
199    /// Equivalent to `.next()`, as this also is an iterator.
200    /// Returns `None` if all chunks have been read.
201    fn read_next_chunk(&mut self) -> Option<Result<Chunk>> { self.next() }
202
203    /// Create a new reader that calls the provided progress
204    /// callback for each chunk that is read from the file.
205    /// If the file can be successfully decoded,
206    /// the progress will always at least once include 0.0 at the start and 1.0 at the end.
207    fn on_progress<F>(self, on_progress: F) -> OnProgressChunksReader<Self, F> where F: FnMut(f64) {
208        OnProgressChunksReader { chunks_reader: self, callback: on_progress, decoded_chunks: 0 }
209    }
210
211    #[cfg(feature = "rayon")]
212    /// Decompress all blocks in the file, using multiple cpu cores, and call the supplied closure for each block.
213    /// The order of the blocks is not deterministic.
214    /// You can also use `parallel_decompressor` to obtain an iterator instead.
215    /// Will fallback to sequential processing where threads are not available, or where it would not speed up the process.
216    // FIXME try async + futures instead of rayon! Maybe even allows for external async decoding? (-> impl Stream<UncompressedBlock>)
217    fn decompress_parallel(
218        self, pedantic: bool,
219        mut insert_block: impl FnMut(&MetaData, UncompressedBlock) -> UnitResult
220    ) -> UnitResult
221    {
222        let mut decompressor = match self.parallel_decompressor(pedantic) {
223            Err(old_self) => return old_self.decompress_sequential(pedantic, insert_block),
224            Ok(decompressor) => decompressor,
225        };
226
227        while let Some(block) = decompressor.next() {
228            insert_block(decompressor.meta_data(), block?)?;
229        }
230
231        debug_assert_eq!(decompressor.len(), 0, "compressed blocks left after decompressing all blocks");
232        Ok(())
233    }
234
235    #[cfg(feature = "rayon")]
236    /// Return an iterator that decompresses the chunks with multiple threads.
237    /// The order of the blocks is not deterministic.
238    /// Use `ParallelBlockDecompressor::new` if you want to use your own thread pool.
239    /// By default, this uses as many threads as there are CPUs.
240    /// Returns the `self` if there is no need for parallel decompression.
241    fn parallel_decompressor(self, pedantic: bool) -> std::result::Result<ParallelBlockDecompressor<Self>, Self> {
242        ParallelBlockDecompressor::new(self, pedantic)
243    }
244
245    /// Return an iterator that decompresses the chunks in this thread.
246    /// You can alternatively use `sequential_decompressor` if you prefer an external iterator.
247    fn decompress_sequential(
248        self, pedantic: bool,
249        mut insert_block: impl FnMut(&MetaData, UncompressedBlock) -> UnitResult
250    ) -> UnitResult
251    {
252        let mut decompressor = self.sequential_decompressor(pedantic);
253        while let Some(block) = decompressor.next() {
254            insert_block(decompressor.meta_data(), block?)?;
255        }
256
257        debug_assert_eq!(decompressor.len(), 0, "compressed blocks left after decompressing all blocks");
258        Ok(())
259    }
260
261    /// Prepare reading the chunks sequentially, only a single thread, but with less memory overhead.
262    fn sequential_decompressor(self, pedantic: bool) -> SequentialBlockDecompressor<Self> {
263        SequentialBlockDecompressor { remaining_chunks_reader: self, pedantic }
264    }
265}
266
267impl<R, F> ChunksReader for OnProgressChunksReader<R, F> where R: ChunksReader, F: FnMut(f64) {
268    fn meta_data(&self) -> &MetaData { self.chunks_reader.meta_data() }
269    fn expected_chunk_count(&self) -> usize { self.chunks_reader.expected_chunk_count() }
270}
271
272impl<R, F> ExactSizeIterator for OnProgressChunksReader<R, F> where R: ChunksReader, F: FnMut(f64) {}
273impl<R, F> Iterator for OnProgressChunksReader<R, F> where R: ChunksReader, F: FnMut(f64) {
274    type Item = Result<Chunk>;
275
276    fn next(&mut self) -> Option<Self::Item> {
277        self.chunks_reader.next().map(|item|{
278            {
279                let total_chunks = self.expected_chunk_count() as f64;
280                let callback = &mut self.callback;
281                callback(self.decoded_chunks as f64 / total_chunks);
282            }
283
284            self.decoded_chunks += 1;
285            item
286        })
287            .or_else(||{
288                debug_assert_eq!(
289                    self.decoded_chunks, self.expected_chunk_count(),
290                    "chunks reader finished but not all chunks are decompressed"
291                );
292
293                let callback = &mut self.callback;
294                callback(1.0);
295                None
296            })
297    }
298
299    fn size_hint(&self) -> (usize, Option<usize>) {
300        self.chunks_reader.size_hint()
301    }
302}
303
304impl<R: Read + Seek> ChunksReader for AllChunksReader<R> {
305    fn meta_data(&self) -> &MetaData { &self.meta_data }
306    fn expected_chunk_count(&self) -> usize { self.remaining_chunks.end }
307}
308
309impl<R: Read + Seek> ExactSizeIterator for AllChunksReader<R> {}
310impl<R: Read + Seek> Iterator for AllChunksReader<R> {
311    type Item = Result<Chunk>;
312
313    fn next(&mut self) -> Option<Self::Item> {
314        // read as many chunks as the file should contain (inferred from meta data)
315        let next_chunk = self.remaining_chunks.next()
316            .map(|_| Chunk::read(&mut self.remaining_bytes, &self.meta_data));
317
318        // if no chunks are left, but some bytes remain, return error
319        if self.pedantic && next_chunk.is_none() && self.remaining_bytes.peek_u8().is_ok() {
320            return Some(Err(Error::invalid("end of file expected")));
321        }
322
323        next_chunk
324    }
325
326    fn size_hint(&self) -> (usize, Option<usize>) {
327        (self.remaining_chunks.len(), Some(self.remaining_chunks.len()))
328    }
329}
330
331impl<R: Read + Seek> ChunksReader for FilteredChunksReader<R> {
332    fn meta_data(&self) -> &MetaData { &self.meta_data }
333    fn expected_chunk_count(&self) -> usize { self.expected_filtered_chunk_count }
334}
335
336impl<R: Read + Seek> ExactSizeIterator for FilteredChunksReader<R> {}
337impl<R: Read + Seek> Iterator for FilteredChunksReader<R> {
338    type Item = Result<Chunk>;
339
340    fn next(&mut self) -> Option<Self::Item> {
341        // read as many chunks as we have desired chunk offsets
342        self.remaining_filtered_chunk_indices.next().map(|next_chunk_location|{
343            self.remaining_bytes.skip_to( // no-op for seek at current position, uses skip_bytes for small amounts
344                  usize::try_from(next_chunk_location)?
345            )?;
346
347            let meta_data = &self.meta_data;
348            Chunk::read(&mut self.remaining_bytes, meta_data)
349        })
350
351        // TODO remember last chunk index and then seek to index+size and check whether bytes are left?
352    }
353
354    fn size_hint(&self) -> (usize, Option<usize>) {
355        (self.remaining_filtered_chunk_indices.len(), Some(self.remaining_filtered_chunk_indices.len()))
356    }
357}
358
359/// Read all chunks from the file, decompressing each chunk immediately.
360/// Implements iterator.
361#[derive(Debug)]
362pub struct SequentialBlockDecompressor<R: ChunksReader> {
363    remaining_chunks_reader: R,
364    pedantic: bool,
365}
366
367impl<R: ChunksReader> SequentialBlockDecompressor<R> {
368
369    /// The extracted meta data from the image file.
370    pub fn meta_data(&self) -> &MetaData { self.remaining_chunks_reader.meta_data() }
371
372    /// Read and then decompress a single block of pixels from the byte source.
373    pub fn decompress_next_block(&mut self) -> Option<Result<UncompressedBlock>> {
374        self.remaining_chunks_reader.read_next_chunk().map(|compressed_chunk|{
375            UncompressedBlock::decompress_chunk(compressed_chunk?, &self.remaining_chunks_reader.meta_data(), self.pedantic)
376        })
377    }
378}
379
380#[cfg(feature = "rayon")]
381/// Decompress the chunks in a file in parallel.
382/// The first call to `next` will fill the thread pool with jobs,
383/// starting to decompress the next few blocks.
384/// These jobs will finish, even if you stop reading more blocks.
385/// Implements iterator.
386#[derive(Debug)]
387pub struct ParallelBlockDecompressor<R: ChunksReader> {
388    remaining_chunks: R,
389    sender: std::sync::mpsc::Sender<Result<UncompressedBlock>>,
390    receiver: std::sync::mpsc::Receiver<Result<UncompressedBlock>>,
391    currently_decompressing_count: usize,
392    max_threads: usize,
393
394    shared_meta_data_ref: std::sync::Arc<MetaData>,
395    pedantic: bool,
396
397    pool: rayon_core::ThreadPool,
398}
399
400#[cfg(feature = "rayon")]
401impl<R: ChunksReader> ParallelBlockDecompressor<R> {
402
403    /// Create a new decompressor. Does not immediately spawn any tasks.
404    /// Decompression starts after the first call to `next`.
405    /// Returns the chunks if parallel decompression should not be used.
406    /// Use `new_with_thread_pool` to customize the threadpool.
407    pub fn new(chunks: R, pedantic: bool) -> std::result::Result<Self, R> {
408        Self::new_with_thread_pool(chunks, pedantic, ||{
409            rayon_core::ThreadPoolBuilder::new()
410                .thread_name(|index| format!("OpenEXR Block Decompressor Thread #{}", index))
411                .build()
412        })
413    }
414
415    /// Create a new decompressor. Does not immediately spawn any tasks.
416    /// Decompression starts after the first call to `next`.
417    /// Returns the chunks if parallel decompression should not be used.
418    pub fn new_with_thread_pool<CreatePool>(chunks: R, pedantic: bool, try_create_thread_pool: CreatePool)
419        -> std::result::Result<Self, R>
420        where CreatePool: FnOnce() -> std::result::Result<rayon_core::ThreadPool, rayon_core::ThreadPoolBuildError>
421    {
422        use crate::compression::Compression;
423
424        let is_entirely_uncompressed = chunks.meta_data().headers.iter()
425            .all(|head|head.compression == Compression::Uncompressed);
426
427        // if no compression is used in the file, don't use a threadpool
428        if is_entirely_uncompressed {
429            return Err(chunks);
430        }
431
432        // in case thread pool creation fails (for example on WASM currently),
433        // we revert to sequential decompression
434        let pool = match try_create_thread_pool() {
435            Ok(pool) => pool,
436
437            // TODO print warning?
438            Err(_) => return Err(chunks),
439        };
440
441        let max_threads = pool.current_num_threads().max(1).min(chunks.len()) + 2; // ca one block for each thread at all times
442
443        let (send, recv) = std::sync::mpsc::channel(); // TODO bounded channel simplifies logic?
444
445        Ok(Self {
446            shared_meta_data_ref: std::sync::Arc::new(chunks.meta_data().clone()),
447            currently_decompressing_count: 0,
448            remaining_chunks: chunks,
449            sender: send,
450            receiver: recv,
451            pedantic,
452            max_threads,
453
454            pool,
455        })
456    }
457
458    /// Fill the pool with decompression jobs. Returns the first job that finishes.
459    pub fn decompress_next_block(&mut self) -> Option<Result<UncompressedBlock>> {
460
461        while self.currently_decompressing_count < self.max_threads {
462            let block = self.remaining_chunks.next();
463            if let Some(block) = block {
464                let block = match block {
465                    Ok(block) => block,
466                    Err(error) => return Some(Err(error))
467                };
468
469                let sender = self.sender.clone();
470                let meta = self.shared_meta_data_ref.clone();
471                let pedantic = self.pedantic;
472
473                self.currently_decompressing_count += 1;
474
475                self.pool.spawn(move || {
476                    let decompressed_or_err = UncompressedBlock::decompress_chunk(
477                        block, &meta, pedantic
478                    );
479
480                    // by now, decompressing could have failed in another thread.
481                    // the error is then already handled, so we simply
482                    // don't send the decompressed block and do nothing
483                    let _ = sender.send(decompressed_or_err);
484                });
485            }
486            else {
487                // there are no chunks left to decompress
488                break;
489            }
490        }
491
492        if self.currently_decompressing_count > 0 {
493            let next = self.receiver.recv()
494                .expect("all decompressing senders hung up but more messages were expected");
495
496            self.currently_decompressing_count -= 1;
497            Some(next)
498        }
499        else {
500            debug_assert!(self.receiver.try_recv().is_err(), "uncompressed chunks left in channel after decompressing all chunks"); // TODO not reliable
501            debug_assert_eq!(self.len(), 0, "compressed chunks left after decompressing all chunks");
502            None
503        }
504    }
505
506    /// The extracted meta data of the image file.
507    pub fn meta_data(&self) -> &MetaData { self.remaining_chunks.meta_data() }
508}
509
510impl<R: ChunksReader> ExactSizeIterator for SequentialBlockDecompressor<R> {}
511impl<R: ChunksReader> Iterator for SequentialBlockDecompressor<R> {
512    type Item = Result<UncompressedBlock>;
513    fn next(&mut self) -> Option<Self::Item> { self.decompress_next_block() }
514    fn size_hint(&self) -> (usize, Option<usize>) { self.remaining_chunks_reader.size_hint() }
515}
516
517#[cfg(feature = "rayon")]
518impl<R: ChunksReader> ExactSizeIterator for ParallelBlockDecompressor<R> {}
519#[cfg(feature = "rayon")]
520impl<R: ChunksReader> Iterator for ParallelBlockDecompressor<R> {
521    type Item = Result<UncompressedBlock>;
522    fn next(&mut self) -> Option<Self::Item> { self.decompress_next_block() }
523    fn size_hint(&self) -> (usize, Option<usize>) {
524        let remaining = self.remaining_chunks.len() + self.currently_decompressing_count;
525        (remaining, Some(remaining))
526    }
527}
528
529
530
531
532