无损格式之所以无损,是因为压缩前后的文件,最终解码为原始PCM音频数据后(或者说wav格式除去头部的定义封装的部分,剩下来的data块),得到的数据都是相同的。
同样一份无压缩的音频(wav文件)转换为flac、alac、ape(位深度、采样率不变的情况下,使用正常的编解码程序例如ffmpeg),压缩后虽然形态各异,但最终解码成原始PCM音频数据后都是一样的(基于DSD的音频文件除外),而音乐播放器软件最终发送到音频播放设备的也正是原始的PCM音频数据,所以说只要编解码的软件不出岔子,理论上讲这么转换后听到的声音是一样的。所以如果可能的话,无损编码时选择较高的压缩率有利于减少文件大小节省空间。
格式和算法的选择上,个人觉得flac还是最优的,首先是开源和通用性,这点不用多说,好像连单片机都有编解码的库,还有就是校验上不仅有帧内的CRC,还有对整段编码前的原始PCM音频数据进行的MD5校验(MediaInfo显示的【MD5 of the unencoded content】就是),能够判断音频是否损坏或篡改,而且容错率高,即便坏了一部分也能放(不像ape那样一有损坏就放不了了),此外还支持流特性(比如直播、实时传输)和快速定位表(seektable),总之十分科学和强大。有人可能会说flac的压缩率可能不太行,但经过我前段时间与ape的对比测试发现,同样采用最大压缩率(flac --best/mac -c5000(Insane)),ape只有在CD音质的规格下稍微有点领先,一旦规格上到24bit/48kHz及以上时,flac的表现基本持平甚至有时能反超ape,且flac的解码速度比ape快得多,同一份文件,flac嗖的一下就解码好了,而ape要多等好几秒,这么一对比该选哪个一目了然。
能知道flac的优势在哪也是因为我对它的特性比较熟悉,甚至研究过它的标准文件(RFC 9639)。其他的格式比如tak、alac我也了解过,下载过官方的编解码的命令行程序,看过一些里面的说明,发现这些格式能处理的音频规格范围(采样率、位深度)大都比flac要窄,alac、ape的这个范围虽然和flac基本一致,不过还是相对小众了些,alac苹果专用,也是最近几年才开源,ape要不是这次专门研究的话,基本已经淡出视野了。flac应该也是应用最广泛的。
以上算是一些科普,了解了这些,就很容易明白为什么选flac,也能放心转换无损格式的音频了。
研究这个【flac、wav文件无损压缩瘦身工具】也是最近在mora自购了3张专辑,发现用flac --best将mora原先零压缩的flac文件压缩后,原先总共2.17GiB的文件压缩到了1.53GiB,省了接近600MiB,也算是相当可观了,自己也很清楚这么操作完全不会对最终解码播放出的原始PCM音频数据产生任何改动,当然后续分享的时候还是要按照版规以原文件的形式分享出去,所以就在想能不能做个批量无损压缩工具出来分享给各位,虽然说动手能力强的朋友直接命令行批处理就完事了,不过我这里还研究了如何预分配文件空间减少碎片的方法(【Python】Windows上创建文件时进行空间预分配,使之具有连续非碎片化空间的正确方法 ),而且知道虽然flac.exe能处理wav文件,但是原wav文件有图片封面或metadata文字标签的话(有“ID3”块),输出的flac文件播放器和mediainfo里是看不到这些的,虽然说用ffmpeg就能把这些转移过来,但是ffmpeg没有flac.exe的“-0(--fast)”~“-8(--best)”的预设来得方便,尤其是flac.exe的“--best”能最大化压缩率而ffmpeg就很难实现(ffmpeg帮助里的编码选项和参数连我都看晕了),想着能不能整合这些实用的特征,于是研究了这个“flac、wav文件无损压缩瘦身工具”。
除了这个工具以外,还有之前研究的音频转wav、ape的工具,都在下面的百度云里有python脚本 :
https://pan.baidu.com/s/15UlvYMQouQcNptXas1Ns6Q?pwd=0000
双击这些脚本即可看到帮助,运行环境Python3.8及以上(因为脚本所调用的外部工具ffmpeg的原因,不太能找到新的32位版本,所以建议至少win7_64位)。
所要安装的Python第三方库和我这边用的版本如下:
Send2Trash==1.8.3
mutagen==1.47.0
psutil==7.0.0
pillow==10.4.0
pywin32==308
pywin32-ctypes==0.2.3
这些脚本其中:
【to_wav.py】(转wav格式)、【flac_compress.py】(wav、flac瘦身工具)是用windows文件API达成文件预分配(无碎片)输出的;
【ape_enc.py】(ape编码工具)由于无法让APE官方的编解码程序【MAC.exe】输出到stdout,所以程序无法一次性输出无碎片的ape文件(再整理一遍磁盘碎片显得有点多余且更耗时,所以放弃了)。
(PS:【MAC.exe】就是个文件碎片制造机,编码成ape文件产生的磁盘碎片比flac.exe多多了,同一份wav文件,flac.exe出来的flac文件有3-4个磁盘文件碎片,【MAC.exe】产生的碎片要乘以10左右,而且编解码速度比flac慢得多,压缩率也才堪堪超过flac一丢丢,所以【ape_enc.py】只适合拿来研究和学习,当“花瓶”用,不推荐实际使用 )
音频metadata的支持情况:
【flac_compress.py】支持保留原音频文件的封面和metadata文本标签;
【ape_enc.py】仅支持保留metadata文本标签,不支持保留封面图片;
【to.wav】不保留任何metadata标签,算是个纯解码的。
【to_wav.py】、【ape_enc.py】是之前花了很长一段时间搞出来的,可以说很大程度上是给monkeys audio官方擦屁股的工程(代码的注释里有说明)。
【flac_compress.py】使最近搞出来的,时间有点仓促。
因为论坛的代码编辑功能体验不是太好,复制下来会有多余的东西出现,所以仅展示一个脚本的代码,反正百度云分享的就是py源代码文件。
以下是其中【flac、wav文件无损压缩瘦身工具】的源码,其中里面的【struct_flac_vorbis_comment_block】、【struct_flac_picture_block】、【rewrite_flac_PCM_MD5_checksum】、【get_source_file_metadata】等函数及其调用的子函数比较有研究价值:
import os,sys,subprocess,io,msvcrt,time,hashlib,re
from contextlib import suppress
from copy import copy
# 第三方库
import psutil,win32file
from PIL import Image
# 程序cmd标题
cmd_title = "flac、wav文件无损压缩瘦身工具"
# 程序所在文件夹
program_dir = os.path.dirname(__file__)
# 程序可执行文件名
app_exe = os.path.basename(sys.argv[0])
# 管道缓冲大小
buf_size=1*1024*1024 # 1 MiB
# 如果不是以分隔符结尾,要补上分隔符“;”,
# 在刚刚装好的win7虚拟机上吃了一亏,然后补上的
if not (os.environ['PATH']).endswith(";"):
os.environ['PATH'] += ";"
# 添加外部工具路径到临时环境变量,方便运行
os.environ['PATH'] += f"{program_dir}\\External_Tools;"
# 文件系统信息
fs_info_dict = dict()
for partition in psutil.disk_partitions(all=True):
section = getattr(partition,"mountpoint","") # 盘符
fs_type = getattr(partition,"fstype","") # 文件系统类型
try:
# 获取每扇区字节数,和每簇的扇区数
sectors_per_cluster , bytes_per_sector , _ ,_ =win32file.GetDiskFreeSpace(section)
except:
cluster_size = 0
else:
# 相乘得到簇大小
cluster_size = bytes_per_sector * sectors_per_cluster
fs_info_dict[copy(section)]=(copy(fs_type) , copy(cluster_size))
# 检查是否是资源管理器文件拖曳处理的模式
# 如果是的话,结束前额外暂停3秒展示信息
parent_process = psutil.Process().ppid()
# 脚本文件还需要查【py.exe】的父进程
if app_exe.lower().endswith(".py"):
parent_process = psutil.Process(parent_process).ppid()
start_from_explorer = True if ((psutil.Process(parent_process).name()) == "explorer.exe") else False
# 如果不是以分隔符结尾,要补上分隔符“;”,
# 在刚刚装好的win7虚拟机上吃了一亏,然后补上的
if not (os.environ['PATH']).endswith(";"):
os.environ['PATH'] += ";"
# 添加外部工具路径到临时环境变量,方便运行
os.environ['PATH'] += f"{program_dir}\\External_Tools;"
# metadata标签修正和排序顺序
flac_meta_tag_ref_dict = {
"track": (0,"TRACKNUMBER"),
"tracknumber":(0,"TRACKNUMBER"),
"title": (1,"TITLE"),
"name": (2,"name"),
"artist":(3,"ARTIST"),
"performer":(4,"performer"),
"composer": (5,"COMPOSER"),
"date": (6,"DATE"),
"year": (7,"year"),
"mood": (8,"MOOD"),
"genre": (9,"GENRE"),
"genrenumber":(10,"GENRENUMBER"),
"album": (11,"ALBUM"),
"albumartist": (12,"ALBUMARTIST"),
"album_artist": (12,"ALBUMARTIST"),
"album artist": (12,"ALBUMARTIST"),
"tracktotal": (13,"TRACKTOTAL"),
"disc": (14,"DISCNUMBER"),
"discnumber": (14,"DISCNUMBER"),
"disk": (14,"DISCNUMBER"),
"disknumber": (14,"DISCNUMBER"),
"copyright": (15,"COPYRIGHT"),
"organization": (16,"ORGANIZATION"),
"comment": (17,"COMMENT"),
"discription": (18,"DISCRIPTION"),
"lyrics": (19,"LYRICS"),
}
def meta_filter(key:str) -> tuple:
if (ret:=flac_meta_tag_ref_dict.get(key.lower())):
pass
else:
ret = (100,copy(key))
return ret
# 恢复标题
def set_title() -> None:
os.system(f"title {cmd_title}")
return
# 需要手动构建vorbis_comment_block以支持多行文本标签,
# metaflac提供的--import-tags-from选项不支持多行文本标签
def struct_flac_vorbis_comment_block(meta_dict:dict, vendor_string="") -> bytearray:
vorbis_comment_block = b""
fields = [(f"{key}={value}").encode(encoding="utf-8", errors="replace") for key, value in meta_dict.items()]
field_part = b"".join([
(len(single_field).to_bytes(length=4, byteorder="little", signed=False) + single_field) \
for single_field in fields
])
number_of_fields_part = len(fields).to_bytes(length=4, byteorder="little", signed=False)
vendor_string_part = vendor_string.encode(encoding="utf-8", errors="replace")
vendor_string_length_part = len(vendor_string_part).to_bytes(length=4, byteorder="little", signed=False)
block_size = sum([
len(i) for i in \
[
vendor_string_length_part, vendor_string_part,
number_of_fields_part, field_part
]
])
block_size_part = block_size.to_bytes(length=3, byteorder="big", signed=False)
block_head_part = b"\x04"
vorbis_comment_block = b"".join([
block_head_part, block_size_part,
vendor_string_length_part, vendor_string_part,
number_of_fields_part, field_part
])
return bytearray(vorbis_comment_block)
# 获取flac.exe的vendor_string
# MediaInfo软件显示的【Writing library: libFLAC 1.X.X】就是由vendor_string决定的,
# 手动构建vorbis_comment_block是需要这个的,这样输出的flac就知道是哪个版本的flac.exe输出的。
def get_flac_tool_vendor_string() -> str:
process = subprocess.Popen(
# “flac -l 0 -0 -b 4608 …… --no-mid-side” 这部分命令用于生成和原wav文件大小一致的flac,
# 是从一个国外的论坛里找到的
# 虽然这里实际上不需要做得这么“绝”,不过还是列出来供各位参考
args="flac -l 0 -0 -b 4608 --disable-constant-subframes --disable-fixed-subframes --no-md5 --no-padding --no-seektable --no-adaptive-mid-side --no-mid-side --force-raw-format --endian=little --sign=signed --channels=2 --bps=16 --sample-rate=44100 --stdout - 2>nul",
stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=None,
shell=True,
)
# 给100个对齐的sample让flac编码,输出的flac文件中的vorbis_comment里就有vendor_string
stdout_bytes, _ = process.communicate(b"\x00"*4*100)
stdout_bytes = bytearray(stdout_bytes)
if (ret:=stdout_bytes.find(b"reference\x20libFLAC")) != -1:
vendor_string_length = int.from_bytes(bytes=stdout_bytes[ret-4:ret], byteorder="little", signed="False")
vendor_string = (stdout_bytes[ret : ret+vendor_string_length]).decode(encoding="utf-8", errors="replace")
return vendor_string
# 获取metadata
def get_source_file_metadata(input_file:str) -> dict:
meta_dict = dict()
with suppress(Exception):
p = subprocess.run(f"ffmpeg -loglevel quiet -i "{input_file}" -bitexact -map_metadata:g 0:g -map_metadata:g 0:s:a:0 -map_chapters -1 -f ffmetadata - " , shell=True , capture_output=True , check=True)
# ffmpeg输出的ffmetadata文件固定是utf-8
content = p.stdout.decode("utf-8",errors="replace")
# 替换多行注释和特殊字符【ffmpeg-all.html#toc-Metadata-2】
content = content.replace("\r\\\n","<↵>").replace("\\\n","<↵>").replace("\\\","\") # \r\\\n是因为ffmpeg似乎只替换\n,漏掉\r\n的情况(歌词里)
content = content.replace("\\#","#").replace("\\=","=").replace("\\;",";")
# 分行并过滤
content = [ spl for i in content.splitlines() \
if (line := i.strip()) and (not line.startswith((";","#"))) \
and len(spl := line.split("=",1))==2 \
and spl[-1] ]
# 查找替换正式key值,分行过后还原换行符
content = [[ meta_filter(key), value.replace("<↵>","\n") ] for key,value in content]
# 排序
content.sort(key=lambda x:x[0][0])
# 转换并添加到字典
for key, value in content:
meta_dict[copy(key[-1])]=copy(value)
return meta_dict
def try_decode(input_bytes:bytes) -> str:
# 可能的控制台输入输出编码
possible_encodings = ["utf-8-sig" , "gb18030" , "utf-16" , "cp932"]
output_string = ""
for i in possible_encodings:
try:
output_string = input_bytes.decode(encoding=i, errors="strict")
except:
pass
else:
break
if not output_string:
output_string = input_bytes.decode(encoding="utf-8-sig",errors="replace")
return output_string
# 获取源音频文件的附加图片数据
def get_source_file_top_attached_pic_data(file:str) -> io.BytesIO:
data_io = io.BytesIO()
with suppress(Exception):
p = subprocess.run(f"ffprobe -hide_banner -i "{file}"" , shell=True , capture_output=True)
stderr_content = try_decode(p.stderr)
# 从stderr中解析附加图片所在的位置
pic_pos_list = []
for line in [line.strip() for line in stderr_content.splitlines()]:
if (tmp := re.match(r"^Stream[\s]*#0[:](\d+)[:][\s]*Video:(.+)\(attached pic\)[ DISCUZ_CODE_0 ]quot;, line, re.I)):
pic_pos_list.append(int(tmp.group(1)))
# 定位排在最前面的附加图片,然后尝试导出
pic_pos_list.sort()
if pic_pos_list:
top_pic_pos = pic_pos_list[0]
readed_size = 0
readed_size_limit = 0xFFFFFF-64 # 超过这个大小flac的picture_block塞不下(3个byte的表达范围 - 64个Byte的格式需要预留的空间)
need_terminate = False
p = subprocess.Popen(
args=f"ffmpeg -loglevel quiet -i "{file}" -map 0:{top_pic_pos} -frames:v 1 -update 1 -c copy -f image2 -",
bufsize=buf_size*2,
stdin=None, stdout=subprocess.PIPE, stderr=None,
shell=True,
)
while (not p.stdout.closed) and (buf := p.stdout.read(buf_size)):
if readed_size <= readed_size_limit:
data_io.write(buf)
else:
# 放弃数据
data_io.truncate(0)
# 需要结束进程
need_terminate = True
break
readed_size += len(buf)
# 如果输出图片数据超出大小,结束ffmpeg进程
if need_terminate:
with suppress(Exception): p.stdout.close()
with suppress(Exception): p.kill()
else:
# 管道里没东西后,尝试关闭
with suppress(Exception): p.stdout.close()
# 等待进程结束
while (p.poll() is None):
time.sleep(0.1)
return data_io
def struct_flac_picture_block(pic_data:io.BytesIO, picture_type=3, description="") -> bytearray:
picture_block = b""
pixel_bit_depth_mode_dict = {
"1":1,
"L":8,
"P":8,
"RGB":3*8,
"RGBA":4*8,
"CMYK":4*8,
"YCbCr":3*8,
"LAB":3*8,
"HSV":3*8,
"I":32,
"F":32,
}
with pic_data:
try:
img = Image.open(pic_data)
except:
img = None
if img:
# 获取图片信息
with img:
mimetype = tmp if ((tmp:=img.get_format_mimetype()) and isinstance(tmp,str)) else ""
width, height = img.size
bits_per_pixel = tmp if (tmp:=pixel_bit_depth_mode_dict.get(img.mode)) else 0
if img.format == "GIF":
if img.mode != 'P':
img = img.convert('P')
colors_used = 256 # GIF max colors is 256
with suppress(Exception): colors_used = len(img.getcolors(maxcolors=256)) # GIF max colors is 256
else:
colors_used = 0 #“0” for non-indexed pictures
# 构建picture_block
picture_type_part = \
picture_type.to_bytes(length=4, byteorder="big", signed=False)
media_type_string_part = \
mimetype.encode(encoding="utf-8", errors="replace")
media_type_string_length_part = \
len(media_type_string_part).to_bytes(length=4, byteorder="big", signed=False)
description_string_part = \
description.encode(encoding="utf-8", errors="replace")
description_string_length_part = \
len(description_string_part).to_bytes(length=4, byteorder="big", signed=False)
width_part, height_part = \
width.to_bytes(length=4, byteorder="big", signed=False), \
height.to_bytes(length=4, byteorder="big", signed=False)
bits_per_pixel_part = \
bits_per_pixel.to_bytes(length=4, byteorder="big", signed=False)
colors_used_part = \
colors_used.to_bytes(length=4, byteorder="big", signed=False)
picture_data_part = \
pic_data.getvalue()
picture_data_length_part = \
len(picture_data_part).to_bytes(length=4, byteorder="big", signed=False)
# 计算block_size
block_size = sum([
len(i) for i in [
picture_type_part,
media_type_string_length_part, media_type_string_part,
description_string_length_part, description_string_part,
width_part, height_part, bits_per_pixel_part, colors_used_part,
picture_data_length_part, picture_data_part,
]
])
block_size_part = \
block_size.to_bytes(length=3, byteorder="big", signed=False)
block_head_part = b"\x06"
# 组装picture_block
picture_block = b"".join([
block_head_part, block_size_part,
picture_type_part,
media_type_string_length_part, media_type_string_part,
description_string_length_part, description_string_part,
width_part, height_part, bits_per_pixel_part, colors_used_part,
picture_data_length_part, picture_data_part,
])
return bytearray(picture_block)
def restruct_flac_meta_part_in_std_pipe(std_pipe:io.BytesIO, vorbis_comment_block:bytearray, picture_block:bytearray, padding_size=-1) -> bytes:
flac_frame_start = b""
block_list = [(4,vorbis_comment_block),(6,picture_block)]
while (not std_pipe.closed):
# 读取假想的block_type和block_size
buf = bytearray(std_pipe.read(4))
if len(buf) != 4:
break
block_type = buf[0] & 0b01111111
is_last_block = bool(buf[0] & 0b10000000)
# flac文件头
if bytes(buf)==b"fLaC":
#什么都不做,等下一轮读取下一个block的size
pass
# meta部分结束,flac的frame部分开始
# (最后一个meta_block最高位没有置1的后备方案)
elif buf.startswith( (b"\xFF\xF8",b"\xFF\xF9") ):
flac_frame_start = bytes(buf)
break
# 舍弃原先的vorbis_comment_block、picture_block、seektable_block、Application_block、cuesheet_block(cuesheet不太可能有,所以舍去)
elif block_type in (2,3,4,5,6):
# 读取并丢弃
read_count = int.from_bytes(bytes=buf[1:], byteorder="big", signed=False)
if (not std_pipe.closed):
std_pipe.read(read_count)
# 如果是最后一个块,就没有必要继续读取了
if is_last_block:
break
# steaminfo_block
elif block_type==0:
read_count = int.from_bytes(bytes=buf[1:], byteorder="big", signed=False)
if (not std_pipe.closed):
data = std_pipe.read(read_count)
if len(data) == read_count:
buf.extend(data)
block_list.append( (0, copy(buf)) )
# 如果是最后一个块,就没有必要继续读取了
if is_last_block:
break
# padding_block(虽然block_type==1,但排序上这里给到所有block的最后:9)
elif block_type==1:
# 自定义padding_block大小
if padding_size > 0:
# 读取并丢弃原来的padding_block
read_count = int.from_bytes(bytes=buf[1:], byteorder="big", signed=False)
if (not std_pipe.closed):
std_pipe.read(read_count)
# 替换为新的padding_block
padding_size = min(0xFFFFFF , padding_size)# 限制大小
padding_size_part = padding_size.to_bytes(length=3, byteorder="big", signed=False)
block_list.append( (9, bytearray(b"\x01" + padding_size_part + b"\x00"*padding_size)) )
# padding_block大小不变
elif padding_size < 0:
read_count = int.from_bytes(bytes=buf[1:], byteorder="big", signed=False)
if (not std_pipe.closed):
data = std_pipe.read(read_count)
if len(data) == read_count:
buf.extend(data)
block_list.append( (9, copy(buf)) )
# 为零,删除padding_block
else:
# 读取并丢弃原来的padding_block
read_count = int.from_bytes(bytes=buf[1:], byteorder="big", signed=False)
if (not std_pipe.closed):
std_pipe.read(read_count)
# 如果是最后一个块,就没有必要继续读取了
if is_last_block:
break
# 现阶段没有上面列出的以外的类型
else:
raise Exception("读取到未知类型的meta_block")
# 滤除空白block
block_list = [i for i in block_list if i[-1]]
# 按给定的序号从前到后排序
block_list.sort(key=lambda x:x[0])
# 扔掉排序用的序号
block_list = [i[-1] for i in block_list]
# 先全部标记为不是最后一个块
for block in block_list:
block[0] = block[0] & 0b01111111
# 再给最后一个块标记
block_list[-1][0] = block_list[-1][0] | 0b10000000
# 检查必要的第一个块streaminfo_block是否存在
if block_list[0][0] not in (0x00,0x80):
raise Exception("未获取到必要的streaminfo_block")
result_flac_meta_part = b"fLaC" + b"".join(block_list) + flac_frame_start
return result_flac_meta_part
def rewrite_flac_PCM_MD5_checksum(flac_file:str) -> None:
md5_checksum_obj = hashlib.md5()
command = f"flac -d --force-raw-format --sign=signed --endian=little --stdout "{flac_file}" 2>nul"
process = subprocess.Popen(
args=command,
bufsize=2*buf_size,
stdin=None, stdout=subprocess.PIPE, stderr=None,
shell=True,
)
while (not process.stdout.closed) and (buf := process.stdout.read(buf_size)):
md5_checksum_obj.update(buf)
# 管道里没东西后,尝试关闭
with suppress(Exception): process.stdout.close()
# 等待进程结束
while (process.poll() is None):
time.sleep(0.1)
# 检查是否返回非零
if process.returncode:
raise Exception("回写【MD5 of the unencoded content】,flac解码时返回非零")
# 获取MD5的bytes表达(切片,确保不超出范围)
result = md5_checksum_obj.digest()[0:16]
# 根据flac文件的标准,MD5的位置是固定不变的,
# 0x1a的位置用winhex打开对照MediaInfo找的。
with open(flac_file, mode="br+") as f:
f.seek(0x1a , os.SEEK_SET)
f.write(result)
return
# 转移修改时间
def transfer_modify_time(input_file:str, output_file:str) -> None:
with suppress(Exception):
# access_time是来凑数的,毕竟time的tuple必须要两个参数
access_time = os.path.getctime(output_file)
modify_time = os.path.getmtime(input_file)
os.utime(output_file , (access_time , modify_time))
# 无返回值
return
# 退出时的行为
def app_exit(return_code:int=0, message:str="") -> None:
if message:
if return_code:
sys.stderr.write(f"\n\n{message}\n")
else:
sys.stdout.write(f"\n\n{message}\n")
# stderr无缓冲,是即时的
sys.stdout.flush()
if start_from_explorer:
sys.stdout.write("\n\n★ 按任意键结束 ★\n")
sys.stdout.flush()
os.system("@pause>nul")
sys.exit(return_code)
# 是否为flac文件或wav文件
def is_flac_or_wav(file:str) -> int:
if os.path.isfile(file):
with open(file,mode="rb") as f:
buf = f.read(4)
if buf == b"fLaC":
return 1
elif buf == b"RIFF":
with suppress(Exception):
f.seek(8,os.SEEK_SET)
buf = f.read(8)
if buf == b"WAVEfmt\x20":
return 2
return 0
# Windows上创建文件时进行空间预分配,使之具有连续非碎片化空间
# 【无VDL解锁(无管理员权限时)、无debug功能简化版、固定为“虚模式”】
# 原理和更多详情可见:https://zhuanlan.zhihu.com/p/1943261864013309766
def win_preallocate_newfile(
# 日常使用参数
file:str, size:int, exist_ok:bool=False,
buffering:int=-1,
text_mode:bool=False, encoding:str="utf-8-sig",
errors=None, newline="\r\n",
) -> io.BytesIO:
# 分区名
drive_name = os.path.splitdrive(os.path.abspath(file))[0] + "\"
"""
# 文件系统
fs_type = tmp if ( tmp := (fs_info_dict.get(drive_name))[0] ) else ""
new_fs = True if (fs_type in {"NTFS","ReFS"}) else False
"""
# 簇大小
cluster_size = tmp if ( tmp := (fs_info_dict.get(drive_name))[-1] ) else 1
# 与簇大小对齐的文件分配空间
al_size = (size + (cluster_size - remain_size)) if (remain_size := size%cluster_size) else size
# 检查文件是否已经存在
if os.path.isfile(file) and (not exist_ok):
raise Exception("文件已存在,且未设置覆盖")
# 上面为止文件都没有正式打开
# 下面套个try是为了方便在失败时关掉句柄和删除残留
try:
# 打开一个python文件句柄
if text_mode:
py_fh = open(file, mode="wt+", encoding=encoding, buffering=buffering, errors=errors, newline=newline)
else:
py_fh = open(file, mode="wb+", buffering=buffering)
# 转换为windows的句柄方便操作
win_hf = msvcrt.get_osfhandle(py_fh.fileno())
# 设置文件的磁盘分配空间
win32file.SetFileInformationByHandle(win_hf , win32file.FileAllocationInfo , al_size)
"""
# 根据上面的配置结果,选择是否在一开始就移动EOF至文件的分配大小
if new_fs:
# 移动EOF至分配的文件大小
# 虽然EOF的大小(文件大小)不需要对齐簇大小,
# 不过这里设置成对齐簇大小的al_size,多一丢丢文件的实际大小,问题也不大
win32file.SetFileInformationByHandle(win_hf , win32file.FileEndOfFileInfo , al_size)
"""
except Exception as x:
e = copy(x) # 如果不找个新变量copy过来,下面的with suppress(Exception)会使存储异常的变量“人间蒸发”
with suppress(Exception): py_fh.close()
with suppress(Exception): os.remove(file)
raise e
return py_fh
'''
........ ....... .....
=@@@@@@@. ,@@@@@@@ =@@@@
=@@@@@@@^ /@@@@@@@ =@@@@
=@@@@@@@@. =@@@@@@@@ .]]]]]]` .]]]` ,]]]].
=@@@@=@@@^ @@@@=@@@@ /@@@@@@@@@@@. =@@@@ =@@@@/@@@@@@@@`
=@@@@.@@@@. =@@@^=@@@@ /@@@/` .,@@@@^ =@@@@ =@@@@@/. ,@@@@@.
=@@@@.=@@@\ @@@@.=@@@@ ,]/@@@@ =@@@@ =@@@@^ =@@@@. ,]]]]]]]]]]]]]]]]]]]`
=@@@@. @@@@/@@@^ =@@@@ ,@@@@@@@@@@@@ =@@@@ =@@@@. =@@@@. \@@@@@@@@@@@@@@@@@@@@.
=@@@@. =@@@@@@@. =@@@@ .@@@@@/[`.=@@@/ =@@@@ =@@@@. =@@@@. =@@^
=@@@@. @@@@@@^ =@@@@ =@@@@` ,@@@@\ =@@@@ =@@@@. =@@@@. =@@^
=@@@@. =@@@@@. =@@@@ \@@@@@@@@@@@@@ =@@@@ =@@@@. =@@@@. @@@@@@@@@@
,@@@@. @@@@/ =@@@@ ,\@@@@[` \@@@^ =@@@O ,@@@@. ,@@@@. ,@@@@@@@@`
=@@@@@@^
@@@@@@
.@@@@^
=@@/
\@.
`
'''
set_title()
# 排除异常的情况
if (os.system("flac --version >nul 2>nul")):
app_exit(4,"未找到flac官方命令行工具【flac.exe】")
if (os.system("metaflac --version >nul 2>nul")):
app_exit(4,"未找到flac官方命令行工具【metaflac.exe】")
if (os.system("ffmpeg -L >nul 2>nul")):
app_exit(4,"未找到ffmpeg")
if (len_argv := len(sys.argv)) > 2:
app_exit(2,"\n\n参数个数过多。\n\n请拖放单个文件夹至此程序图标上,无损压缩此文件夹中的flac、wav文件;\n或复制此脚本到目标目录下双击运行,以无损压缩目标目录下的flac、wav文件。\n")
# 获取flac工具vorbis_comment里的vendor_string
vendor_string = get_flac_tool_vendor_string()
if len_argv == 1:
input_folder = "."
else:
if (not os.path.isdir(input_folder:=sys.argv[1])):
app_exit(1,f"文件夹【{input_folder}】不存在")
os.chdir(input_folder)
print_out_folder = "当前脚本所在目录" if input_folder=="." else os.getcwd()
if start_from_explorer:
sys.stdout.write(f"\n帮助说明:\n\n此工具对flac、wav文件使用“flac --best”命令全部压制为尺寸最优化的flac文件\n以到达无损瘦身的目的(同时保留封面和metadata信息)。\n\n拖放单个文件夹至此程序图标上,无损压缩瘦身此文件夹中的flac、wav文件;\n\n或复制此脚本到目标目录下双击运行,以无损压缩瘦身目标目录下的flac、wav文件。\n\n")
sys.stdout.write(f"\n\n★ 即将无损压缩【{print_out_folder}】下的所有flac和wav文件 ★\n\n★ 按键盘任意键继续,或鼠标点击右上角关闭按钮退出。 ★\n\n")
os.system("@pause>nul")
filelist = []
for root , _ , filesets in os.walk("."):
for file in filesets:
filelist.append(f"{root}\\{file}")
flac_processable_filelist = [(i,ret) for i in filelist if (ret := is_flac_or_wav(i))]
total = len(flac_processable_filelist)
completed = 0
os.system(f"title 进度:{completed}/{total}")
for src_file, file_type in flac_processable_filelist:
try:
# flac输出时文件名会冲突,先重命名
filename = os.path.splitext(src_file)[0]
tmp_src = f"{filename}.bak.flac" if file_type==1 else f"{filename}.bak.wav"
output = f"{filename}.flac"
os.rename(src_file, tmp_src)
# 获取源文件大小
filesize = os.path.getsize(tmp_src)
# 预分配文件的大小比原文件大50MiB备用
pre_allocate_size = filesize + 50 * 1024**2
## 配置指令。
# flac需要输出到stdout才能控制文件句柄进行预分配
# flac输出到stdout是没法回写【MD5 of the unencoded content】的,“--no-md5”可以在编码时关闭pcm裸流MD5的实时更新计算,节省CPU
# 同样,seektable也是没法回写的,用【--no-seektable】关掉
# 【--no-keep-foreign-metadata】是wav编码为flac时是否保留可能存在的ID3的元数据区,我这边试过,保留了播放器也显示不出封面图片和歌手标题什么的,所以关闭。
command = f"flac --no-md5 --no-seektable --no-keep-foreign-metadata --best --stdout "{tmp_src}""
# 执行压缩
process = subprocess.Popen(
args = command,
bufsize = buf_size*2,
stdout=subprocess.PIPE, stderr=None,
shell=True,
)
# 预分配写出
with win_preallocate_newfile(output, size=pre_allocate_size) as f:
# 如果输入的是wav文件的话,还需要转移meta
if file_type == 2:
buf = \
restruct_flac_meta_part_in_std_pipe(
std_pipe = process.stdout,
vorbis_comment_block = \
struct_flac_vorbis_comment_block(
get_source_file_metadata(tmp_src), vendor_string
),
picture_block = \
struct_flac_picture_block(
get_source_file_top_attached_pic_data(tmp_src)
),
)
f.write(buf)
while (not process.stdout.closed) and (buf := process.stdout.read(buf_size)):
f.write(buf)
# 如果在“实模式”预分配下,文件EOF一开始就移动好了,就需要截断,不然尾部会残留空白00空间;
# 不过这里用的是虚模式,文件EOF由系统根据写入的数据长度实时更新,截不截断就无所谓了,系统会帮你“善后”的
f.truncate()
# 管道里没东西后,尝试关闭
with suppress(Exception): process.stdout.close()
# 等待进程结束
while (process.poll() is None):
time.sleep(0.1)
# 检查是否返回非零
if process.returncode:
raise Exception("flac编码时返回非零")
# 回写【MD5 of the unencoded content】
rewrite_flac_PCM_MD5_checksum(output)
# 建立seektable
# 与上同理,有padding_block的存在,一般不会产生碎片
process = subprocess.run(args=f"metaflac --add-seekpoint=5s "{output}"",shell=True)
# 检查是否返回非零
if process.returncode:
raise Exception("建立seektable时metaflac返回非零")
# 转移修改时间
transfer_modify_time(tmp_src, output)
except Exception as e:
with suppress(Exception): os.remove(output)
sys.stderr.write(f"\n× 【{output}】出错了,详情:{e} ×\n\n")
else:
sys.stderr.write(f"\n√ 【{output}】成功 √\n\n")
finally:
completed += 1
os.system(f"title 进度:{completed}/{total}")
app_exit(0,"\n处理完毕,如输出文件(不带“.bak.”字样的)没有问题,可以手动在目标目录下按住shift右键唤出cmd,\n然后输入以下命令按回车删除源文件:del /s /q *.bak.*\n")
复制代码 MK.II
复制代码