| 本地虚拟流服务 |
aiohttp |
轻量 Animeko 同款功能 Python 实现文档(HTML版)
以下是完整的可直接打开的 HTML 文件,包含所有核心内容、代码块且做了语法高亮和排版优化,可保存为 `animeko_python_implement.html` 本地查看:
```html
Animeko 同款资源解析+在线播放 Python 实现
基于 Animeko 核心设计思路(统一抽象→多源解析→择优选源→流媒体播放),结合 Python 异步生态实现同款资源解析+在线播放功能,
核心支持直链/BT/磁力/Jellyfin 多源聚合,复刻 Animeko 接口解耦、多源择优、BT边下边播核心能力,搭配 mpv 实现跨平台播放。
核心技术栈选型
| 功能模块 |
核心库/工具 |
核心作用 |
| 异步网络请求 |
aiohttp、httpx |
异步HTTP请求,支持反爬、自定义请求头 |
| BT/磁力解析&边播 |
libtorrent-python、aiotorrent |
解析磁力/种子,实现P2P分块下载+边下边播 |
| 本地虚拟流服务 |
aiohttp |
轻量异步HTTP服务,实现BT视频块本地流转发 |
| 多源择优&测速 |
asyncio、ping3 |
协程并行解析/测速,实现自动选最优播放源 |
| 视频播放 |
python-mpv、ffpyplayer |
封装mpv播放器,支持流媒体/本地流/BT虚拟流播放 |
| 数据解析 |
lxml、BeautifulSoup4、pydantic v2 |
HTML/JSON解析,数据模型校验与标准化 |
| 配置&缓存 |
pydantic-settings、filelock、sqlite3 |
配置管理,本地缓存锁,播放进度持久化 |
| 可选UI |
PyQt6、Streamlit |
跨平台桌面UI,快速可视化调试界面 |
核心设计:复刻 Animeko 标准化抽象
完全借鉴 Animeko 「接口解耦+统一模型」思路,定义核心数据模型和抽象接口,为多源扩展打下基础。
2.1 统一媒体资源模型(Pydantic v2)
from enum import StrEnum, auto
from pydantic import BaseModel, Field
from uuid import uuid4
from typing import Optional, List, Dict
# 资源类型枚举(对应Animeko的MediaType)
class MediaType(StrEnum):
HTTP = auto()
HTTPS = auto()
TORRENT = auto()
MAGNET = auto()
JELLYFIN = auto()
EMBY = auto()
# 字幕模型
class Subtitle(BaseModel):
name: str
url: str
# 核心媒体模型:所有源解析后都转化为该模型
class Media(BaseModel):
id: str = Field(default_factory=lambda: str(uuid4()))
title: str # 集数+画质标题
url: str # 直链/磁力/种子地址
type: MediaType
quality: str = "720P" # 画质:1080P/720P/480P
headers: Dict[str, str] = Field(default_factory=dict) # 请求头(反爬/鉴权)
extras: Dict[str, str] = Field(default_factory=dict) # 扩展字段(如BT文件索引)
subtitles: List[Subtitle] = Field(default_factory=list)
file_size: Optional[int] = None # 文件大小(字节)
# 选源结果模型:含测速/稳定性
class MediaSelectorResult(BaseModel):
media: Media
speed: float # 测速结果(KB/s)
stability: int = 100 # 稳定性评分(默认100)
# 番剧集数模型:上层传入的基础信息
class Episode(BaseModel):
id: str # 集数唯一ID
title: str # 集数标题
bangumi_id: Optional[str] = None # 对应Bangumi ID
magnet_url: Optional[str] = None # 可选默认磁力
jellyfin_item_id: Optional[str] = None # Jellyfin项目ID
2.2 抽象数据源接口(ABC抽象基类)
所有第三方源需实现该接口,上层业务无需关心具体实现,新增源仅需加实现类,完全解耦。
from abc import ABC, abstractmethod
from typing import List
from model import Media, Episode
class MediaDataSource(ABC):
"""所有媒体数据源的抽象基类,对应Animeko的MediaDataSource"""
@abstractmethod
async def parse_media(self, episode: Episode) -> List[Media]:
"""解析番剧集数,返回标准化Media列表"""
pass
@abstractmethod
async def speed_test(self, media: Media) -> float:
"""对Media做测速,返回下载速度(KB/s)"""
pass
@abstractmethod
async def validate(self, media: Media) -> bool:
"""校验Media资源是否有效"""
pass
# BT源专属抽象接口(对应Animeko的TorrentDataSource)
class TorrentDataSource(MediaDataSource):
@abstractmethod
async def parse_torrent(self, torrent_url: str) -> Media:
"""单独解析种子文件,返回Media"""
pass
多源资源解析实现
基于抽象接口,实现直链源、BT/磁力源解析,以及多源自动择优逻辑,复刻 Animeko 并行解析+测速能力。
3.1 直链源解析(HTTP/HTTPS)
爬取第三方动漫站直链,解析为标准化Media模型,支持自定义请求头、测速与有效性校验。
import aiohttp
from model import Media, MediaType, Episode, Subtitle
from datasource.abc import MediaDataSource
from asyncio import timeout
# 自定义直链源解析器(示例:爬取某动漫站直链)
class CustomOnlineDataSource(MediaDataSource):
BASE_URL = "https://example-anime.com" # 替换为实际源地址
USER_AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
def __init__(self):
self._session: aiohttp.ClientSession = None
self._init_session()
def _init_session(self):
self._session = aiohttp.ClientSession(
headers={"User-Agent": self.USER_AGENT},
timeout=aiohttp.ClientTimeout(total=10)
)
async def parse_media(self, episode: Episode) -> list[Media]:
"""解析直链:请求接口→提取播放地址→封装Media"""
media_list = []
try:
async with self._session.get(
url=f"{self.BASE_URL}/api/play",
params={"ep_id": episode.id}
) as resp:
resp.raise_for_status()
play_info_list = await resp.json() # 假设返回JSON格式
# 转化为标准化Media模型
for info in play_info_list:
media_type = MediaType.HTTPS if info["play_url"].startswith("https") else MediaType.HTTP
media = Media(
title=f"{episode.title}-{info['quality']}",
url=info["play_url"],
type=media_type,
quality=info["quality"],
headers=info.get("headers", {}),
subtitles=[Subtitle(**sub) for sub in info.get("subtitles", [])],
file_size=info.get("file_size")
)
media_list.append(media)
return media_list
except Exception as e:
raise Exception(f"直链解析失败: {str(e)}")
async def speed_test(self, media: Media) -> float:
"""HTTP测速:请求前1024字节,计算下载速度(KB/s)"""
try:
async with timeout(3):
async with self._session.get(
media.url,
headers=media.headers,
range="0-1023" # 只请求前1024字节
) as resp:
if resp.status not in (200, 206):
return 0.0
_ = await resp.read()
speed = 1024 / 1024 # 基础值,可根据耗时优化
return round(speed, 2)
except:
return 0.0
async def validate(self, media: Media) -> bool:
"""校验资源有效性:发送HEAD请求"""
try:
async with timeout(2):
async with self._session.head(media.url, headers=media.headers) as resp:
return resp.status in (200, 206, 302)
except:
return False
async def close(self):
"""关闭aiohttp session"""
if self._session:
await self._session.close()
3.2 BT/磁力源解析(核心难点)
基于libtorrent实现磁力/种子解析,筛选视频文件,封装为标准化Media模型,支持BT节点测速。
import libtorrent as lt
from model import Media, MediaType, Episode
from datasource.abc import TorrentDataSource
from typing import List
import asyncio
class LibTorrentDataSource(TorrentDataSource):
def __init__(self):
# 初始化libtorrent会话
self._ses = lt.session()
self._ses.listen_on(6881, 6891)
self._video_suffix = (".mp4", ".mkv", ".flv", ".avi", ".mov") # 筛选视频文件
async def parse_media(self, episode: Episode) -> List[Media]:
"""解析磁力链接,返回标准化Media列表"""
if not episode.magnet_url:
raise Exception("番剧集数无磁力链接")
# 异步解析磁力(libtorrent为同步,用asyncio.run_in_executor包装)
torrent_info = await asyncio.get_event_loop().run_in_executor(
None, self._parse_magnet, episode.magnet_url
)
# 筛选视频文件
video_files = [
f for f in torrent_info.files()
if f.path.endswith(self._video_suffix)
]
if not video_files:
raise Exception("种子中无视频文件")
# 封装为Media模型
media_list = []
for idx, f in enumerate(video_files):
quality = "1080P" if "1080" in f.path else "720P" if "720" in f.path else "480P"
media = Media(
title=f"{episode.title}-{f.path.split('/')[-1]}-{quality}",
url=episode.magnet_url,
type=MediaType.MAGNET,
quality=quality,
extras={"file_index": str(idx), "file_size": str(f.size)},
file_size=f.size
)
media_list.append(media)
return media_list
async def parse_torrent(self, torrent_url: str) -> Media:
"""解析种子文件URL,返回单个Media"""
torrent_file = await self._download_torrent_file(torrent_url)
ti = lt.torrent_info(torrent_file)
video_file = next(f for f in ti.files() if f.path.endswith(self._video_suffix))
return Media(
title=video_file.path.split('/')[-1],
url=torrent_url,
type=MediaType.TORRENT,
quality="1080P" if "1080" in video_file.path else "720P",
extras={"file_index": str(0), "file_size": str(video_file.size)},
file_size=video_file.size
)
async def speed_test(self, media: Media) -> float:
"""BT测速:获取节点连接速度(简化版)"""
try:
peer_num = await asyncio.get_event_loop().run_in_executor(
None, self._get_peer_num, media.url
)
return min(peer_num * 0.5, 100) # 节点数*0.5,最大100KB/s
except:
return 0.0
async def validate(self, media: Media) -> bool:
"""校验磁力/种子有效性:是否能获取元数据"""
try:
await asyncio.get_event_loop().run_in_executor(
None, self._parse_magnet, media.url
)
return True
except:
return False
# --- libtorrent 同步方法(需包装为异步)---
def _parse_magnet(self, magnet_url: str) -> lt.torrent_info:
"""解析磁力链接,获取种子元数据"""
params = lt.parse_magnet_uri(magnet_url)
handle = self._ses.add_torrent(params)
while not handle.has_metadata():
lt.sleep(0.1)
return handle.get_torrent_info()
def _get_peer_num(self, magnet_url: str) -> int:
"""获取磁力链接的节点数"""
params = lt.parse_magnet_uri(magnet_url)
handle = self._ses.add_torrent(params)
while not handle.has_metadata():
lt.sleep(0.1)
return handle.num_peers()
async def _download_torrent_file(self, torrent_url: str) -> bytes:
"""下载种子文件(基于aiohttp)"""
async with aiohttp.ClientSession() as sess:
async with sess.get(torrent_url) as resp:
return await resp.read()
3.3 多源择优器(MediaSelector)
基于asyncio协程并行解析+测速,自动选择最优播放源,提升选源效率。
from model import Episode, MediaSelectorResult
from datasource.abc import MediaDataSource
from typing import List
import asyncio
class MediaSelector:
def __init__(self, data_sources: List[MediaDataSource]):
self.data_sources = data_sources # 注入所有数据源实现类
async def select_best_media(self, episode: Episode) -> MediaSelectorResult:
"""并行解析+并行测速,返回最优Media"""
# 1. 协程并行:解析所有数据源的Media
parse_tasks = [ds.parse_media(episode) for ds in self.data_sources]
parse_results = await asyncio.gather(*parse_tasks, return_exceptions=True)
# 过滤异常结果,合并所有Media
all_media = []
for res in parse_results:
if isinstance(res, list) and res:
all_media.extend(res)
if not all_media:
raise Exception("无可用播放源")
# 2. 协程并行:校验+测速所有Media
async def _test_media(media: Media) -> tuple[Media, float]:
for ds in self.data_sources:
if await ds.validate(media):
speed = await ds.speed_test(media)
return (media, speed)
return (media, 0.0)
test_tasks = [_test_media(media) for media in all_media]
test_results = await asyncio.gather(*test_tasks)
# 3. 按速度降序排序,选择最优
test_results = sorted(test_results, key=lambda x: x[1], reverse=True)
best_media, best_speed = test_results[0]
return MediaSelectorResult(
media=best_media,
speed=best_speed,
stability=100 if best_speed > 0 else 0
)
在线播放核心实现
复刻 Animeko 「本地虚拟流服务+通用播放器」思路,支持直链流直接播放、BT/磁力边下边播,封装统一播放器管理器屏蔽源差异。
4.1 本地虚拟流服务(BT边播基础)
基于aiohttp实现轻量异步HTTP服务,将BT下载的视频块转为本地HTTP流,供播放器调用,实现「边下边播」。
from aiohttp import web
import asyncio
from typing import Optional, Callable
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("local-stream-server")
class LocalStreamServer:
"""本地虚拟流服务:将BT下载的视频块转为HTTP本地流"""
_instance: Optional["LocalStreamServer"] = None
_app: web.Application
_runner: web.AppRunner
_site: web.TCPSite
_port: int = 8888
_host: str = "127.0.0.1"
# 流回调:接收请求范围,返回视频块数据
stream_callback: Optional[Callable[[str, str], bytes]] = None
def __new__(cls, *args, **kwargs):
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
def __init__(self):
if not hasattr(self, "_app"):
self._app = web.Application()
self._app.add_routes([web.get("/stream", self._stream_handler)])
self._runner = web.AppRunner(self._app)
async def start(self):
"""启动本地流服务"""
await self._runner.setup()
self._site = web.TCPSite(self._runner, self._host, self._port)
await self._site.start()
logger.info(f"本地虚拟流服务启动:http://{self._host}:{self._port}/stream")
async def stop(self):
"""停止本地流服务"""
await self._runner.cleanup()
logger.info("本地虚拟流服务已停止")
@property
def stream_url(self) -> str:
"""获取本地流地址"""
return f"http://{self._host}:{self._port}/stream"
async def _stream_handler(self, request: web.Request) -> web.Response:
"""流请求处理器:处理Range请求,返回视频块"""
if not self.stream_callback:
return web.Response(status=500, text="流回调未设置")
# 获取Range请求头(如 Range: bytes=0-1023)
range_header = request.headers.get("Range", "bytes=0-")
# 调用回调获取视频块数据
data = self.stream_callback(request.path, range_header)
# 返回206部分内容
return web.Response(
body=data,
status=206,
headers={
"Content-Type": "video/mp4",
"Accept-Ranges": "bytes",
"Content-Range": f"{range_header}/{len(data)}"
}
)
4.2 BT边下边播核心引擎
复刻 Animeko TorrentEngine,实现BT分块优先下载、本地流回调,将BT视频块喂给本地流服务。
import libtorrent as lt
import asyncio
from model import Media
from stream_server import LocalStreamServer
import logging
logger = logging.getLogger("bt-engine")
class BtPlayEngine:
def __init__(self):
self._ses = lt.session()
self._ses.listen_on(6881, 6891)
self._handle: Optional[lt.torrent_handle] = None
self._file_index: int = 0 # 要播放的视频文件索引
self._stream_server = LocalStreamServer()
# 注册流回调到本地流服务
self._stream_server.stream_callback = self._on_stream_request
async def start_play(self, media: Media) -> str:
"""启动BT边下边播,返回本地流地址"""
# 1. 解析磁力/种子,获取torrent handle
self._file_index = int(media.extras["file_index"])
if media.type == media.type.MAGNET:
params = lt.parse_magnet_uri(media.url)
else: # TORRENT
ti = lt.torrent_info(media.url)
params = lt.add_torrent_params(ti=ti, save_path="./cache/bt")
# 设置下载策略:优先下载视频文件
params.file_priorities = [0] * len(params.ti.files())
params.file_priorities[self._file_index] = 1 # 目标文件设为高优先级
self._handle = self._ses.add_torrent(params)
# 等待元数据加载
while not self._handle.has_metadata():
await asyncio.sleep(0.1)
logger.info(f"BT元数据加载完成,开始边下边播:{media.title}")
# 2. 启动本地流服务
await self._stream_server.start()
# 3. 后台协程:更新下载优先级(跟随播放位置)
asyncio.create_task(self._update_priority_loop())
return self._stream_server.stream_url
async def stop_play(self):
"""停止BT边播,释放资源"""
if self._handle:
self._ses.remove_torrent(self._handle)
await self._stream_server.stop()
logger.info("BT边下边播已停止")
async def _update_priority_loop(self):
"""后台循环:更新BT下载优先级(优先播放位置前后的块)"""
while self._handle and self._handle.status().state != lt.torrent_status.seeding:
# 获取当前播放位置的块索引(后续从播放器获取实际进度)
current_pos = 0 # 播放器进度(字节),需对接播放器
ti = self._handle.get_torrent_info()
block_size = ti.piece_size()
current_piece = current_pos // block_size
# 优先下载当前块前后10个块
for i in range(max(0, current_piece - 10), min(ti.num_pieces(), current_piece + 10)):
self._handle.piece_priority(i, 7) # 最高优先级
await asyncio.sleep(1)
def _on_stream_request(self, path: str, range_header: str) -> bytes:
"""流回调:处理Range请求,返回BT下载的视频块数据"""
if not self._handle:
return b""
# 解析Range头:bytes=start-end
range_part = range_header.replace("bytes=", "")
start, end = range_part.split("-")
start = int(start) if start else 0
end = int(end) if end else start + 1024*1024 # 默认每次返回1MB
# 从BT缓存文件读取数据
ti = self._handle.get_torrent_info()
file_offset = ti.files()[self._file_index].offset
data = b""
try:
with open(f"./cache/bt/{ti.name()}", "rb") as f:
f.seek(file_offset + start)
data = f.read(end - start + 1)
except:
pass
return data
4.3 统一播放器管理器
封装python-mpv,屏蔽直链流/BT本地流差异,提供播放/暂停/进度获取等通用方法,对应Animeko的PlayerManager。
import mpv
from model import Media, MediaType
from torrent.engine import BtPlayEngine
from typing import Optional
import asyncio
class PlayerManager:
_instance: Optional["PlayerManager"] = None
_player: Optional[mpv.MPV] = None
_bt_engine: BtPlayEngine = BtPlayEngine()
_current_media: Optional[Media] = None
def __new__(cls, *args, **kwargs):
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
def __init__(self):
if not hasattr(self, "_player"):
# 初始化mpv播放器,开启流媒体支持
self._player = mpv.MPV(
ytdl=False,
force_window="immediate",
input_default_bindings=True,
input_vo_keyboard=True
)
# 注册播放器事件
self._player.register_event_callback(self._on_mpv_event)
async def play_media(self, media: Media):
"""播放媒体:自动判断源类型,直链/BT边播"""
self._current_media = media
if media.type in (MediaType.HTTP, MediaType.HTTPS, MediaType.JELLYFIN, MediaType.EMBY):
# 直链播放:直接传入URL+请求头
self._player.set_property("http-header-fields", [f"{k}:{v}" for k, v in media.headers.items()])
self._player.play(media.url)
elif media.type in (MediaType.TORRENT, MediaType.MAGNET):
# BT边播:获取本地流地址,播放本地流
stream_url = await self._bt_engine.start_play(media)
self._player.play(stream_url)
self._player.wait_for_playback()
def pause(self):
"""暂停播放"""
if self._player:
self._player.pause = True
def resume(self):
"""恢复播放"""
if self._player:
self._player.pause = False
def seek(self, position: float):
"""快进/快退:position为秒"""
if self._player:
self._player.seek(position, reference="absolute")
async def stop(self):
"""停止播放,释放资源"""
if self._player:
self._player.stop()
await self._bt_engine.stop_play()
self._current_media = None
def get_progress(self) -> float:
"""获取当前播放进度(秒)"""
return self._player.time_pos if self._player and self._player.time_pos else 0.0
def _on_mpv_event(self, event):
"""mpv事件回调:处理播放结束、错误等"""
if event.event_id == mpv.EVENT_END_FILE:
asyncio.create_task(self.stop())
print("播放结束")
elif event.event_id == mpv.EVENT_ERROR:
print("播放错误")
整体运行流程
整合所有模块,实现「番剧集数→多源解析→择优→播放」的完整流程,与 Animeko 逻辑完全一致。
# main.py - 主程序入口
from model import Episode
from datasource.online import CustomOnlineDataSource
from datasource.torrent import LibTorrentDataSource
from selector import MediaSelector
from player.manager import PlayerManager
import asyncio
async def main():
# 1. 初始化数据源(可扩展添加Jellyfin/Emby源)
online_ds = CustomOnlineDataSource()
torrent_ds = LibTorrentDataSource()
data_sources = [online_ds, torrent_ds]
# 2. 初始化择优器和播放器
selector = MediaSelector(data_sources)
player = PlayerManager()
# 3. 模拟上层传入的番剧集数信息
test_episode = Episode(
id="12345",
title="某动漫-第1集",
magnet_url="magnet:?xt=urn:btih:xxx...", # 替换为实际磁力链接
)
try:
# 4. 自动选源:并行解析+测速,获取最优Media
best_media = await selector.select_best_media(test_episode)
print(f"选中最优播放源:{best_media.title} | 速度:{best_media.speed}KB/s")
# 5. 开始播放
await player.play_media(best_media)
except Exception as e:
print(f"运行失败:{str(e)}")
finally:
# 6. 释放资源
await online_ds.close()
await player.stop()
if __name__ == "__main__":
asyncio.run(main())
- 初始化直链/BT等数据源,注入到多源择优器;
- 上层传入番剧集数Episode,择优器并行解析所有数据源,得到标准化Media列表;
- 择优器并行测速所有Media,按速度排序返回最优播放源;
- 播放器管理器根据Media.type选择播放策略:直链流直接播放,BT源启动边下边播并播放本地流;
- 播放过程中,BT引擎后台更新下载优先级,本地流服务实时转发视频块;
- 播放结束/异常,自动释放播放器和BT引擎资源。
配套功能实现
快速实现本地资源缓存和Bangumi播放进度同步,贴合 Animeko 完整体验。
6.1 本地资源缓存
import os
from model import Media, MediaType
from filelock import FileLock
import shutil
class LocalMediaCache:
def __init__(self, cache_dir: str = "./cache/media"):
self.cache_dir = cache_dir
os.makedirs(cache_dir, exist_ok=True)
def get_cache_path(self, media: Media) -> str:
"""获取媒体缓存路径"""
return os.path.join(self.cache_dir, f"{media.id}.mp4")
def has_cache(self, media: Media) -> bool:
"""判断是否有本地缓存"""
return os.path.exists(self.get_cache_path(media))
async def cache_media(self, media: Media):
"""缓存直链媒体(断点续传)"""
if media.type not in (MediaType.HTTP, MediaType.HTTPS) or self.has_cache(media):
return
cache_path = self.get_cache_path(media)
lock_path = f"{cache_path}.lock"
with FileLock(lock_path):
# 基于aiohttp的断点续传下载(Range请求)
pass
def clear_cache(self, media: Optional[Media] = None):
"""清理缓存:单个/全部"""
if media:
os.remove(self.get_cache_path(media))
else:
shutil.rmtree(self.cache_dir)
os.makedirs(self.cache_dir, exist_ok=True)
6.2 Bangumi 进度同步
import aiohttp
from model import Episode
from pydantic import BaseModel
class BangumiSyncManager:
def __init__(self, bangumi_token: str):
self._token = bangumi_token
self._base_url = "https://api.bangumi.tv/v0"
self._session = aiohttp.ClientSession(
headers={"Authorization": f"Bearer {self._token}"}
)
async def sync_progress(self, episode: Episode, progress: float, duration: float):
"""同步播放进度到Bangumi(进度/时长为秒)"""
if not episode.bangumi_id:
return
try:
await self._session.post(
url=f"{self._base_url}/episodes/{episode.bangumi_id}/progress",
json={
"progress": progress,
"total": duration,
"status": "finish" if progress >= duration - 10 else "watching"
}
)
except Exception as e:
print(f"Bangumi进度同步失败:{str(e)}")
async def close(self):
await self._session.close()
部署与扩展建议
7.1 环境安装(关键:libtorrent)
# 基础依赖安装
pip install aiohttp pydantic python-mpv libtorrent-python aiofiles filelock ping3 lxml beautifulsoup4
# 可选UI依赖
pip install PyQt6 streamlit
# 注意:libtorrent-python安装失败时,用conda安装
conda install -c conda-forge libtorrent-python
7.2 功能扩展方向
- 新增播放源:只需实现MediaDataSource抽象接口,无需修改上层代码;
- 跨平台UI:基于PyQt6封装播放器UI,调用PlayerManager通用方法实现交互;
- 弹幕支持:整合danmaku2ass将弹幕转为ASS格式,通过mpv加载弹幕;
- 反爬优化:添加rotating-proxies代理池、fake-useragent随机UA;
- 进度持久化:用sqlite3存储播放进度,启动时自动恢复上次播放位置;
- 画质自动切换:根据网络速度自动切换低/高画质播放源。
7.3 性能优化点
- BT边播优化:设置libtorrent缓存大小session.set_settings({"cache_size": 1024*10}),提升本地流速度;
- 并行解析限流:用asyncio.Semaphore限制协程数量,避免请求过多被封;
- 测速优化:改为多段测速(多次请求取平均值),提升测速准确性;
- 本地缓存优化:用LRU策略清理过期缓存,避免磁盘占满;
- 播放器优化:mpv开启硬件解码hwdec="auto",提升播放流畅度。
7.4 Python实现 vs Animeko(Kotlin/KMP)
| 设计点 |
Animeko(Kotlin/KMP) |
Python 实现 |
| 统一模型 |
自定义data class |
Pydantic BaseModel(带数据校验) |
| 异步处理 |
Kotlin Coroutines + Flow |
asyncio 协程 + aiohttp 异步生态 |
| 多源解耦 |
抽象接口 MediaDataSource |
ABC抽象基类 + 实现类注入 |
| BT边播 |
anitorrent + 本地虚拟流 |
libtorrent + aiohttp 本地流服务 |
| 播放器 |
ExoPlayer/VLC(跨平台原生) |
python-mpv(封装mpv,全平台支持) |
| 并行选源 |
Coroutines 并行 |
asyncio.gather 协程并行 |
总结
Python 可完全复刻 Animeko 核心的资源解析+多源择优+在线播放功能,核心是借鉴其「标准化抽象+接口解耦」的设计思路,
结合 Python 异步生态(asyncio/aiohttp)和成熟的第三方库(libtorrent/python-mpv)实现。
相比 Kotlin/KMP,Python 实现的优势是开发效率高、生态丰富、轻量易部署;劣势是跨平台UI不如Compose成熟,
但通过mpv/libtorrent等C扩展库可弥补性能短板,完全满足个人/小范围使用需求。
上述代码为可运行的核心骨架,只需根据实际动漫源微调解析逻辑(如HTML解析规则、API请求参数),即可快速实现一款专属追番工具。
|