diff --git a/cloudvolume/chunks.py b/cloudvolume/chunks.py index e83fa1df..9c5d1032 100644 --- a/cloudvolume/chunks.py +++ b/cloudvolume/chunks.py @@ -151,7 +151,7 @@ def decode( return np.zeros(shape=shape, dtype=dtype, order="F") else: return np.full(shape=shape, fill_value=background_color, dtype=dtype, order="F") - elif isinstance(filedata, np.ndarray): + elif encoding == "numpy" or isinstance(filedata, np.ndarray): return filedata elif encoding == "raw": return decode_raw(filedata, shape=shape, dtype=dtype) diff --git a/cloudvolume/datasource/precomputed/image/rx.py b/cloudvolume/datasource/precomputed/image/rx.py index dc35f5ed..53c46b49 100644 --- a/cloudvolume/datasource/precomputed/image/rx.py +++ b/cloudvolume/datasource/precomputed/image/rx.py @@ -496,27 +496,21 @@ def repopulate_lru_from_shm( )) encoding = lru_encoding - if encoding == "same": - # Since the parallel version populates the LRU via an image and - # you don't get the benefit of accessing the raw downloaded bytes, - # there will be a performance regression for "same" since e.g. - # jpeg -> img -> jpeg will instead of decode -> img,lru you'll - # have decode -> img -> encode -> lru. Therefore, this is hacky, - # but backwards compatible and strictly expands the capabilities - # of the LRU. - encoding = "raw" # would ordinarily be: meta.encoding(mip) - + store_as_numpy = encoding in ("same", "raw") for chunkname in core_chunks[-lru.size:]: bbx = Bbox.from_filename(chunkname) bbx -= requested_bbox.minpt img3d = renderbuffer[ bbx.to_slices() ] - binary = chunks.encode( - img3d, encoding, - meta.compressed_segmentation_block_size(mip), - compression_params=meta.compression_params(mip), - num_threads=meta.config.codec_threads, - ) - lru[chunkname] = (encoding, binary) + if store_as_numpy: + lru[chunkname] = ("numpy", np.copy(img3d, order="F")) + else: + binary = chunks.encode( + img3d, encoding, + meta.compressed_segmentation_block_size(mip), + compression_params=meta.compression_params(mip), + num_threads=meta.config.codec_threads, + ) + lru[chunkname] = (encoding, binary) def child_process_download( meta, cache, @@ -580,6 +574,7 @@ def download_chunk( cf = CloudFiles(cloudpath, secrets=secrets, locking=locking) no_deserialize = (cf.protocol == "mem" and encoding == "raw" and not enable_cache) + lru_miss = False try: encoding, content = lru[filename] except (TypeError, KeyError): @@ -608,21 +603,22 @@ def download_chunk( if content is not None and decompress and not no_deserialize: content = compression.decompress(content, file['compress']) - if lru is not None: - lru[filename] = (encoding, content) + lru_miss = True if no_deserialize: img3d = content else: img3d = decode_fn( - meta, filename, content, - fill_missing, mip, + meta, filename, content, + fill_missing, mip, background_color=background_color, encoding=encoding, ) - if lru is not None and full_decode: - if lru_encoding not in [ "same", encoding ]: + if lru is not None and lru_miss: + if full_decode and lru_encoding == "raw" and isinstance(img3d, np.ndarray): + lru[filename] = ("numpy", img3d) + elif full_decode and lru_encoding not in [ "same", encoding ]: content = None if img3d is not None: block_size = meta.compressed_segmentation_block_size(mip) @@ -630,13 +626,15 @@ def download_chunk( block_size = (8,8,8) content = chunks.encode( - img3d, lru_encoding, + img3d, lru_encoding, block_size, compression_params=meta.compression_params(mip), num_threads=meta.config.codec_threads, ) - + lru[filename] = (lru_encoding, content) + else: + lru[filename] = (encoding, content) return img3d, bbox @@ -647,7 +645,20 @@ def download_chunks_threaded( decompress=True, full_decode=True, ): """fn is the postprocess callback. decode_fn is a decode fn.""" - locations = cache.compute_data_locations(cloudpaths) + + # If every chunk is an LRU hit, skip compute_data_locations + # which does os.listdir on the cache directory. + are_all_lru_hits = False + if lru is not None and lru.size > 0: + if not isinstance(cloudpaths, list): + cloudpaths = list(cloudpaths) + are_all_lru_hits = all(fname in lru for fname in cloudpaths) + + if are_all_lru_hits: + locations = { 'local': [], 'remote': cloudpaths } + else: + locations = cache.compute_data_locations(cloudpaths) + cachedir = 'file://' + cache.path def process(cloudpath, filename, enable_cache, locking): @@ -663,13 +674,11 @@ def process(cloudpath, filename, enable_cache, locking): # If there's an LRU sort the fetches so that the LRU ones are first # otherwise the new downloads can kick out the cached ones and make the # lru useless. - are_all_lru_hits = False - if lru is not None and lru.size > 0: + if lru is not None and lru.size > 0 and not are_all_lru_hits: if not isinstance(locations['remote'], list): - locations['remote'] = list(locations['remote']) - locations['local'].sort(key=lambda fname: fname in lru, reverse=True) + locations['remote'] = list(locations['remote']) + locations['local'].sort(key=lambda fname: fname in lru, reverse=True) locations['remote'].sort(key=lambda fname: fname in lru, reverse=True) - are_all_lru_hits = all(( fname in lru for fname in locations['remote'] )) qualify = lambda fname: os.path.join(meta.key(mip), os.path.basename(fname)) diff --git a/cloudvolume/datasource/precomputed/image/tx.py b/cloudvolume/datasource/precomputed/image/tx.py index d7a5da7f..ef1c3916 100644 --- a/cloudvolume/datasource/precomputed/image/tx.py +++ b/cloudvolume/datasource/precomputed/image/tx.py @@ -375,6 +375,8 @@ def do_upload(i, imgchunk, cloudpath): if lru is not None: if lru_encoding in ["same", encoding]: lru[cloudpath] = (encoding, encoded) + elif lru_encoding == "raw": + lru[cloudpath] = ("numpy", np.copy(imgchunk, order="F")) else: lru_encoded = chunks.encode( imgchunk, lru_encoding,