来自 https://www.v2ex.com/t/1095749, 又提醒 AI 完善了一下.
from typing import Final
import os
import hashlib
SMALL_FILE_THRESHOLD: Final = 0xF000 # 60KB
CHUNK_SIZE: Final = 0x5000 # 20KB
DEFAULT_SEGMENTS: Final = 100 # 默认分段数
def cid_hash_file(
path: str,
segments: int = DEFAULT_SEGMENTS,
chunk_size: int = CHUNK_SIZE,
buffer_size: int = 8192
) -> str:
"""计算文件的分段哈希值
Args:
path: 文件路径
segments: 分段数量,默认100段
chunk_size: 每段读取的大小,默认20KB
buffer_size: 读取缓冲区大小,默认8KB
Returns:
str: 大写的SHA1哈希值
Raises:
FileNotFoundError: 文件不存在
ValueError: segments参数无效
OSError: 读取文件失败
"""
if not os.path.exists(path):
raise FileNotFoundError(f"File not found: {path}")
if segments < 1:
raise ValueError("segments must be greater than 0")
try:
h = hashlib.sha1()
size = os.path.getsize(path)
with open(path, 'rb') as stream:
if size < SMALL_FILE_THRESHOLD:
# 小文件: 读取全部内容
while chunk := stream.read(buffer_size):
h.update(chunk)
else:
# 大文件: 分段读取
# 计算每段之间的间隔
interval = max(1, size // segments)
# 读取每一段
for i in range(segments):
pos = min(i * interval, size - chunk_size)
stream.seek(pos)
# 使用缓冲区读取每一段
remaining = chunk_size
while remaining > 0:
read_size = min(buffer_size, remaining)
chunk = stream.read(read_size)
if not chunk:
break
h.update(chunk)
remaining -= len(chunk)
# 如果已经到达文件末尾,提前结束
if stream.tell() >= size:
break
# 如果已经到达文件末尾,提前结束
if stream.tell() >= size:
break
return h.hexdigest().upper()
except (PermissionError, OSError) as e:
raise OSError(f"Failed to read file {path}: {e}")
# 使用示例
if __name__ == "__main__":
import sys
try:
file_path = sys.argv[1] if len(sys.argv) > 1 else "test.txt"
segments = int(sys.argv[2]) if len(sys.argv) > 2 else DEFAULT_SEGMENTS
hash_value = cid_hash_file(
path=file_path,
segments=segments
)
print(f"File: {file_path}")
print(f"Segments: {segments}")
print(f"Hash: {hash_value}")
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
来自 https://www.v2ex.com/t/1095749, 又提醒 AI 完善了一下.