Animeko Python 实现文档

基于 Animeko 核心设计思路(统一抽象→多源解析→择优选源→流媒体播放),结合 Python 异步生态实现同款资源解析+在线播放功能, 核心支持直链/BT/磁力/Jellyfin 多源聚合,复刻 Animeko 接口解耦、多源择优、BT边下边播核心能力,搭配 mpv 实现跨平台播放。

核心技术栈选型

功能模块 核心库/工具 核心作用
异步网络请求 aiohttp、httpx 异步HTTP请求,支持反爬、自定义请求头
BT/磁力解析&边播 libtorrent-python、aiotorrent 解析磁力/种子,实现P2P分块下载+边下边播
本地虚拟流服务 aiohttp 轻量 Animeko 同款功能 Python 实现文档(HTML版) 以下是完整的可直接打开的 HTML 文件,包含所有核心内容、代码块且做了语法高亮和排版优化,可保存为 `animeko_python_implement.html` 本地查看: ```html Animeko 同款资源解析+在线播放 Python 实现

基于 Animeko 核心设计思路(统一抽象→多源解析→择优选源→流媒体播放),结合 Python 异步生态实现同款资源解析+在线播放功能, 核心支持直链/BT/磁力/Jellyfin 多源聚合,复刻 Animeko 接口解耦、多源择优、BT边下边播核心能力,搭配 mpv 实现跨平台播放。

核心技术栈选型

功能模块 核心库/工具 核心作用
异步网络请求 aiohttp、httpx 异步HTTP请求,支持反爬、自定义请求头
BT/磁力解析&边播 libtorrent-python、aiotorrent 解析磁力/种子,实现P2P分块下载+边下边播
本地虚拟流服务 aiohttp 轻量异步HTTP服务,实现BT视频块本地流转发
多源择优&测速 asyncio、ping3 协程并行解析/测速,实现自动选最优播放源
视频播放 python-mpv、ffpyplayer 封装mpv播放器,支持流媒体/本地流/BT虚拟流播放
数据解析 lxml、BeautifulSoup4、pydantic v2 HTML/JSON解析,数据模型校验与标准化
配置&缓存 pydantic-settings、filelock、sqlite3 配置管理,本地缓存锁,播放进度持久化
可选UI PyQt6、Streamlit 跨平台桌面UI,快速可视化调试界面

核心设计:复刻 Animeko 标准化抽象

完全借鉴 Animeko 「接口解耦+统一模型」思路,定义核心数据模型和抽象接口,为多源扩展打下基础。

2.1 统一媒体资源模型(Pydantic v2)

from enum import StrEnum, auto
from pydantic import BaseModel, Field
from uuid import uuid4
from typing import Optional, List, Dict

# 资源类型枚举(对应Animeko的MediaType)
class MediaType(StrEnum):
    HTTP = auto()
    HTTPS = auto()
    TORRENT = auto()
    MAGNET = auto()
    JELLYFIN = auto()
    EMBY = auto()

# 字幕模型
class Subtitle(BaseModel):
    name: str
    url: str

# 核心媒体模型:所有源解析后都转化为该模型
class Media(BaseModel):
    id: str = Field(default_factory=lambda: str(uuid4()))
    title: str  # 集数+画质标题
    url: str    # 直链/磁力/种子地址
    type: MediaType
    quality: str = "720P"  # 画质:1080P/720P/480P
    headers: Dict[str, str] = Field(default_factory=dict)  # 请求头(反爬/鉴权)
    extras: Dict[str, str] = Field(default_factory=dict)  # 扩展字段(如BT文件索引)
    subtitles: List[Subtitle] = Field(default_factory=list)
    file_size: Optional[int] = None  # 文件大小(字节)

# 选源结果模型:含测速/稳定性
class MediaSelectorResult(BaseModel):
    media: Media
    speed: float  # 测速结果(KB/s)
    stability: int = 100  # 稳定性评分(默认100)

# 番剧集数模型:上层传入的基础信息
class Episode(BaseModel):
    id: str  # 集数唯一ID
    title: str  # 集数标题
    bangumi_id: Optional[str] = None  # 对应Bangumi ID
    magnet_url: Optional[str] = None  # 可选默认磁力
    jellyfin_item_id: Optional[str] = None  # Jellyfin项目ID

2.2 抽象数据源接口(ABC抽象基类)

所有第三方源需实现该接口,上层业务无需关心具体实现,新增源仅需加实现类,完全解耦。

from abc import ABC, abstractmethod
from typing import List
from model import Media, Episode

class MediaDataSource(ABC):
    """所有媒体数据源的抽象基类,对应Animeko的MediaDataSource"""
    @abstractmethod
    async def parse_media(self, episode: Episode) -> List[Media]:
        """解析番剧集数,返回标准化Media列表"""
        pass

    @abstractmethod
    async def speed_test(self, media: Media) -> float:
        """对Media做测速,返回下载速度(KB/s)"""
        pass

    @abstractmethod
    async def validate(self, media: Media) -> bool:
        """校验Media资源是否有效"""
        pass

# BT源专属抽象接口(对应Animeko的TorrentDataSource)
class TorrentDataSource(MediaDataSource):
    @abstractmethod
    async def parse_torrent(self, torrent_url: str) -> Media:
        """单独解析种子文件,返回Media"""
        pass

多源资源解析实现

基于抽象接口,实现直链源、BT/磁力源解析,以及多源自动择优逻辑,复刻 Animeko 并行解析+测速能力。

3.1 直链源解析(HTTP/HTTPS)

爬取第三方动漫站直链,解析为标准化Media模型,支持自定义请求头、测速与有效性校验。

import aiohttp
from model import Media, MediaType, Episode, Subtitle
from datasource.abc import MediaDataSource
from asyncio import timeout

# 自定义直链源解析器(示例:爬取某动漫站直链)
class CustomOnlineDataSource(MediaDataSource):
    BASE_URL = "https://example-anime.com"  # 替换为实际源地址
    USER_AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"

    def __init__(self):
        self._session: aiohttp.ClientSession = None
        self._init_session()

    def _init_session(self):
        self._session = aiohttp.ClientSession(
            headers={"User-Agent": self.USER_AGENT},
            timeout=aiohttp.ClientTimeout(total=10)
        )

    async def parse_media(self, episode: Episode) -> list[Media]:
        """解析直链:请求接口→提取播放地址→封装Media"""
        media_list = []
        try:
            async with self._session.get(
                url=f"{self.BASE_URL}/api/play",
                params={"ep_id": episode.id}
            ) as resp:
                resp.raise_for_status()
                play_info_list = await resp.json()  # 假设返回JSON格式
            
            # 转化为标准化Media模型
            for info in play_info_list:
                media_type = MediaType.HTTPS if info["play_url"].startswith("https") else MediaType.HTTP
                media = Media(
                    title=f"{episode.title}-{info['quality']}",
                    url=info["play_url"],
                    type=media_type,
                    quality=info["quality"],
                    headers=info.get("headers", {}),
                    subtitles=[Subtitle(**sub) for sub in info.get("subtitles", [])],
                    file_size=info.get("file_size")
                )
                media_list.append(media)
            return media_list
        except Exception as e:
            raise Exception(f"直链解析失败: {str(e)}")

    async def speed_test(self, media: Media) -> float:
        """HTTP测速:请求前1024字节,计算下载速度(KB/s)"""
        try:
            async with timeout(3):
                async with self._session.get(
                    media.url,
                    headers=media.headers,
                    range="0-1023"  # 只请求前1024字节
                ) as resp:
                    if resp.status not in (200, 206):
                        return 0.0
                    _ = await resp.read()
                    speed = 1024 / 1024  # 基础值,可根据耗时优化
                    return round(speed, 2)
        except:
            return 0.0

    async def validate(self, media: Media) -> bool:
        """校验资源有效性:发送HEAD请求"""
        try:
            async with timeout(2):
                async with self._session.head(media.url, headers=media.headers) as resp:
                    return resp.status in (200, 206, 302)
        except:
            return False

    async def close(self):
        """关闭aiohttp session"""
        if self._session:
            await self._session.close()

3.2 BT/磁力源解析(核心难点)

基于libtorrent实现磁力/种子解析,筛选视频文件,封装为标准化Media模型,支持BT节点测速。

import libtorrent as lt
from model import Media, MediaType, Episode
from datasource.abc import TorrentDataSource
from typing import List
import asyncio

class LibTorrentDataSource(TorrentDataSource):
    def __init__(self):
        # 初始化libtorrent会话
        self._ses = lt.session()
        self._ses.listen_on(6881, 6891)
        self._video_suffix = (".mp4", ".mkv", ".flv", ".avi", ".mov")  # 筛选视频文件

    async def parse_media(self, episode: Episode) -> List[Media]:
        """解析磁力链接,返回标准化Media列表"""
        if not episode.magnet_url:
            raise Exception("番剧集数无磁力链接")
        # 异步解析磁力(libtorrent为同步,用asyncio.run_in_executor包装)
        torrent_info = await asyncio.get_event_loop().run_in_executor(
            None, self._parse_magnet, episode.magnet_url
        )
        # 筛选视频文件
        video_files = [
            f for f in torrent_info.files()
            if f.path.endswith(self._video_suffix)
        ]
        if not video_files:
            raise Exception("种子中无视频文件")
        # 封装为Media模型
        media_list = []
        for idx, f in enumerate(video_files):
            quality = "1080P" if "1080" in f.path else "720P" if "720" in f.path else "480P"
            media = Media(
                title=f"{episode.title}-{f.path.split('/')[-1]}-{quality}",
                url=episode.magnet_url,
                type=MediaType.MAGNET,
                quality=quality,
                extras={"file_index": str(idx), "file_size": str(f.size)},
                file_size=f.size
            )
            media_list.append(media)
        return media_list

    async def parse_torrent(self, torrent_url: str) -> Media:
        """解析种子文件URL,返回单个Media"""
        torrent_file = await self._download_torrent_file(torrent_url)
        ti = lt.torrent_info(torrent_file)
        video_file = next(f for f in ti.files() if f.path.endswith(self._video_suffix))
        return Media(
            title=video_file.path.split('/')[-1],
            url=torrent_url,
            type=MediaType.TORRENT,
            quality="1080P" if "1080" in video_file.path else "720P",
            extras={"file_index": str(0), "file_size": str(video_file.size)},
            file_size=video_file.size
        )

    async def speed_test(self, media: Media) -> float:
        """BT测速:获取节点连接速度(简化版)"""
        try:
            peer_num = await asyncio.get_event_loop().run_in_executor(
                None, self._get_peer_num, media.url
            )
            return min(peer_num * 0.5, 100)  # 节点数*0.5,最大100KB/s
        except:
            return 0.0

    async def validate(self, media: Media) -> bool:
        """校验磁力/种子有效性:是否能获取元数据"""
        try:
            await asyncio.get_event_loop().run_in_executor(
                None, self._parse_magnet, media.url
            )
            return True
        except:
            return False

    # --- libtorrent 同步方法(需包装为异步)---
    def _parse_magnet(self, magnet_url: str) -> lt.torrent_info:
        """解析磁力链接,获取种子元数据"""
        params = lt.parse_magnet_uri(magnet_url)
        handle = self._ses.add_torrent(params)
        while not handle.has_metadata():
            lt.sleep(0.1)
        return handle.get_torrent_info()

    def _get_peer_num(self, magnet_url: str) -> int:
        """获取磁力链接的节点数"""
        params = lt.parse_magnet_uri(magnet_url)
        handle = self._ses.add_torrent(params)
        while not handle.has_metadata():
            lt.sleep(0.1)
        return handle.num_peers()

    async def _download_torrent_file(self, torrent_url: str) -> bytes:
        """下载种子文件(基于aiohttp)"""
        async with aiohttp.ClientSession() as sess:
            async with sess.get(torrent_url) as resp:
                return await resp.read()

3.3 多源择优器(MediaSelector)

基于asyncio协程并行解析+测速,自动选择最优播放源,提升选源效率。

from model import Episode, MediaSelectorResult
from datasource.abc import MediaDataSource
from typing import List
import asyncio

class MediaSelector:
    def __init__(self, data_sources: List[MediaDataSource]):
        self.data_sources = data_sources  # 注入所有数据源实现类

    async def select_best_media(self, episode: Episode) -> MediaSelectorResult:
        """并行解析+并行测速,返回最优Media"""
        # 1. 协程并行:解析所有数据源的Media
        parse_tasks = [ds.parse_media(episode) for ds in self.data_sources]
        parse_results = await asyncio.gather(*parse_tasks, return_exceptions=True)
        # 过滤异常结果,合并所有Media
        all_media = []
        for res in parse_results:
            if isinstance(res, list) and res:
                all_media.extend(res)
        if not all_media:
            raise Exception("无可用播放源")
        
        # 2. 协程并行:校验+测速所有Media
        async def _test_media(media: Media) -> tuple[Media, float]:
            for ds in self.data_sources:
                if await ds.validate(media):
                    speed = await ds.speed_test(media)
                    return (media, speed)
            return (media, 0.0)
        test_tasks = [_test_media(media) for media in all_media]
        test_results = await asyncio.gather(*test_tasks)
        
        # 3. 按速度降序排序,选择最优
        test_results = sorted(test_results, key=lambda x: x[1], reverse=True)
        best_media, best_speed = test_results[0]
        return MediaSelectorResult(
            media=best_media,
            speed=best_speed,
            stability=100 if best_speed > 0 else 0
        )

在线播放核心实现

复刻 Animeko 「本地虚拟流服务+通用播放器」思路,支持直链流直接播放、BT/磁力边下边播,封装统一播放器管理器屏蔽源差异。

4.1 本地虚拟流服务(BT边播基础)

基于aiohttp实现轻量异步HTTP服务,将BT下载的视频块转为本地HTTP流,供播放器调用,实现「边下边播」。

from aiohttp import web
import asyncio
from typing import Optional, Callable
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("local-stream-server")

class LocalStreamServer:
    """本地虚拟流服务:将BT下载的视频块转为HTTP本地流"""
    _instance: Optional["LocalStreamServer"] = None
    _app: web.Application
    _runner: web.AppRunner
    _site: web.TCPSite
    _port: int = 8888
    _host: str = "127.0.0.1"
    # 流回调:接收请求范围,返回视频块数据
    stream_callback: Optional[Callable[[str, str], bytes]] = None

    def __new__(cls, *args, **kwargs):
        if cls._instance is None:
            cls._instance = super().__new__(cls)
        return cls._instance

    def __init__(self):
        if not hasattr(self, "_app"):
            self._app = web.Application()
            self._app.add_routes([web.get("/stream", self._stream_handler)])
            self._runner = web.AppRunner(self._app)

    async def start(self):
        """启动本地流服务"""
        await self._runner.setup()
        self._site = web.TCPSite(self._runner, self._host, self._port)
        await self._site.start()
        logger.info(f"本地虚拟流服务启动:http://{self._host}:{self._port}/stream")

    async def stop(self):
        """停止本地流服务"""
        await self._runner.cleanup()
        logger.info("本地虚拟流服务已停止")

    @property
    def stream_url(self) -> str:
        """获取本地流地址"""
        return f"http://{self._host}:{self._port}/stream"

    async def _stream_handler(self, request: web.Request) -> web.Response:
        """流请求处理器:处理Range请求,返回视频块"""
        if not self.stream_callback:
            return web.Response(status=500, text="流回调未设置")
        # 获取Range请求头(如 Range: bytes=0-1023)
        range_header = request.headers.get("Range", "bytes=0-")
        # 调用回调获取视频块数据
        data = self.stream_callback(request.path, range_header)
        # 返回206部分内容
        return web.Response(
            body=data,
            status=206,
            headers={
                "Content-Type": "video/mp4",
                "Accept-Ranges": "bytes",
                "Content-Range": f"{range_header}/{len(data)}"
            }
        )

4.2 BT边下边播核心引擎

复刻 Animeko TorrentEngine,实现BT分块优先下载、本地流回调,将BT视频块喂给本地流服务。

import libtorrent as lt
import asyncio
from model import Media
from stream_server import LocalStreamServer
import logging

logger = logging.getLogger("bt-engine")

class BtPlayEngine:
    def __init__(self):
        self._ses = lt.session()
        self._ses.listen_on(6881, 6891)
        self._handle: Optional[lt.torrent_handle] = None
        self._file_index: int = 0  # 要播放的视频文件索引
        self._stream_server = LocalStreamServer()
        # 注册流回调到本地流服务
        self._stream_server.stream_callback = self._on_stream_request

    async def start_play(self, media: Media) -> str:
        """启动BT边下边播,返回本地流地址"""
        # 1. 解析磁力/种子,获取torrent handle
        self._file_index = int(media.extras["file_index"])
        if media.type == media.type.MAGNET:
            params = lt.parse_magnet_uri(media.url)
        else:  # TORRENT
            ti = lt.torrent_info(media.url)
            params = lt.add_torrent_params(ti=ti, save_path="./cache/bt")
        # 设置下载策略:优先下载视频文件
        params.file_priorities = [0] * len(params.ti.files())
        params.file_priorities[self._file_index] = 1  # 目标文件设为高优先级
        self._handle = self._ses.add_torrent(params)
        # 等待元数据加载
        while not self._handle.has_metadata():
            await asyncio.sleep(0.1)
        logger.info(f"BT元数据加载完成,开始边下边播:{media.title}")
        # 2. 启动本地流服务
        await self._stream_server.start()
        # 3. 后台协程:更新下载优先级(跟随播放位置)
        asyncio.create_task(self._update_priority_loop())
        return self._stream_server.stream_url

    async def stop_play(self):
        """停止BT边播,释放资源"""
        if self._handle:
            self._ses.remove_torrent(self._handle)
        await self._stream_server.stop()
        logger.info("BT边下边播已停止")

    async def _update_priority_loop(self):
        """后台循环:更新BT下载优先级(优先播放位置前后的块)"""
        while self._handle and self._handle.status().state != lt.torrent_status.seeding:
            # 获取当前播放位置的块索引(后续从播放器获取实际进度)
            current_pos = 0  # 播放器进度(字节),需对接播放器
            ti = self._handle.get_torrent_info()
            block_size = ti.piece_size()
            current_piece = current_pos // block_size
            # 优先下载当前块前后10个块
            for i in range(max(0, current_piece - 10), min(ti.num_pieces(), current_piece + 10)):
                self._handle.piece_priority(i, 7)  # 最高优先级
            await asyncio.sleep(1)

    def _on_stream_request(self, path: str, range_header: str) -> bytes:
        """流回调:处理Range请求,返回BT下载的视频块数据"""
        if not self._handle:
            return b""
        # 解析Range头:bytes=start-end
        range_part = range_header.replace("bytes=", "")
        start, end = range_part.split("-")
        start = int(start) if start else 0
        end = int(end) if end else start + 1024*1024  # 默认每次返回1MB
        # 从BT缓存文件读取数据
        ti = self._handle.get_torrent_info()
        file_offset = ti.files()[self._file_index].offset
        data = b""
        try:
            with open(f"./cache/bt/{ti.name()}", "rb") as f:
                f.seek(file_offset + start)
                data = f.read(end - start + 1)
        except:
            pass
        return data

4.3 统一播放器管理器

封装python-mpv,屏蔽直链流/BT本地流差异,提供播放/暂停/进度获取等通用方法,对应Animeko的PlayerManager。

import mpv
from model import Media, MediaType
from torrent.engine import BtPlayEngine
from typing import Optional
import asyncio

class PlayerManager:
    _instance: Optional["PlayerManager"] = None
    _player: Optional[mpv.MPV] = None
    _bt_engine: BtPlayEngine = BtPlayEngine()
    _current_media: Optional[Media] = None

    def __new__(cls, *args, **kwargs):
        if cls._instance is None:
            cls._instance = super().__new__(cls)
        return cls._instance

    def __init__(self):
        if not hasattr(self, "_player"):
            # 初始化mpv播放器,开启流媒体支持
            self._player = mpv.MPV(
                ytdl=False,
                force_window="immediate",
                input_default_bindings=True,
                input_vo_keyboard=True
            )
            # 注册播放器事件
            self._player.register_event_callback(self._on_mpv_event)

    async def play_media(self, media: Media):
        """播放媒体:自动判断源类型,直链/BT边播"""
        self._current_media = media
        if media.type in (MediaType.HTTP, MediaType.HTTPS, MediaType.JELLYFIN, MediaType.EMBY):
            # 直链播放:直接传入URL+请求头
            self._player.set_property("http-header-fields", [f"{k}:{v}" for k, v in media.headers.items()])
            self._player.play(media.url)
        elif media.type in (MediaType.TORRENT, MediaType.MAGNET):
            # BT边播:获取本地流地址,播放本地流
            stream_url = await self._bt_engine.start_play(media)
            self._player.play(stream_url)
        self._player.wait_for_playback()

    def pause(self):
        """暂停播放"""
        if self._player:
            self._player.pause = True

    def resume(self):
        """恢复播放"""
        if self._player:
            self._player.pause = False

    def seek(self, position: float):
        """快进/快退:position为秒"""
        if self._player:
            self._player.seek(position, reference="absolute")

    async def stop(self):
        """停止播放,释放资源"""
        if self._player:
            self._player.stop()
        await self._bt_engine.stop_play()
        self._current_media = None

    def get_progress(self) -> float:
        """获取当前播放进度(秒)"""
        return self._player.time_pos if self._player and self._player.time_pos else 0.0

    def _on_mpv_event(self, event):
        """mpv事件回调:处理播放结束、错误等"""
        if event.event_id == mpv.EVENT_END_FILE:
            asyncio.create_task(self.stop())
            print("播放结束")
        elif event.event_id == mpv.EVENT_ERROR:
            print("播放错误")

整体运行流程

整合所有模块,实现「番剧集数→多源解析→择优→播放」的完整流程,与 Animeko 逻辑完全一致。

# main.py - 主程序入口
from model import Episode
from datasource.online import CustomOnlineDataSource
from datasource.torrent import LibTorrentDataSource
from selector import MediaSelector
from player.manager import PlayerManager
import asyncio

async def main():
    # 1. 初始化数据源(可扩展添加Jellyfin/Emby源)
    online_ds = CustomOnlineDataSource()
    torrent_ds = LibTorrentDataSource()
    data_sources = [online_ds, torrent_ds]
    
    # 2. 初始化择优器和播放器
    selector = MediaSelector(data_sources)
    player = PlayerManager()
    
    # 3. 模拟上层传入的番剧集数信息
    test_episode = Episode(
        id="12345",
        title="某动漫-第1集",
        magnet_url="magnet:?xt=urn:btih:xxx...",  # 替换为实际磁力链接
    )
    
    try:
        # 4. 自动选源:并行解析+测速,获取最优Media
        best_media = await selector.select_best_media(test_episode)
        print(f"选中最优播放源:{best_media.title} | 速度:{best_media.speed}KB/s")
        
        # 5. 开始播放
        await player.play_media(best_media)
    except Exception as e:
        print(f"运行失败:{str(e)}")
    finally:
        # 6. 释放资源
        await online_ds.close()
        await player.stop()

if __name__ == "__main__":
    asyncio.run(main())
  1. 初始化直链/BT等数据源,注入到多源择优器;
  2. 上层传入番剧集数Episode,择优器并行解析所有数据源,得到标准化Media列表;
  3. 择优器并行测速所有Media,按速度排序返回最优播放源;
  4. 播放器管理器根据Media.type选择播放策略:直链流直接播放,BT源启动边下边播并播放本地流;
  5. 播放过程中,BT引擎后台更新下载优先级,本地流服务实时转发视频块;
  6. 播放结束/异常,自动释放播放器和BT引擎资源。

配套功能实现

快速实现本地资源缓存和Bangumi播放进度同步,贴合 Animeko 完整体验。

6.1 本地资源缓存

import os
from model import Media, MediaType
from filelock import FileLock
import shutil

class LocalMediaCache:
    def __init__(self, cache_dir: str = "./cache/media"):
        self.cache_dir = cache_dir
        os.makedirs(cache_dir, exist_ok=True)

    def get_cache_path(self, media: Media) -> str:
        """获取媒体缓存路径"""
        return os.path.join(self.cache_dir, f"{media.id}.mp4")

    def has_cache(self, media: Media) -> bool:
        """判断是否有本地缓存"""
        return os.path.exists(self.get_cache_path(media))

    async def cache_media(self, media: Media):
        """缓存直链媒体(断点续传)"""
        if media.type not in (MediaType.HTTP, MediaType.HTTPS) or self.has_cache(media):
            return
        cache_path = self.get_cache_path(media)
        lock_path = f"{cache_path}.lock"
        with FileLock(lock_path):
            # 基于aiohttp的断点续传下载(Range请求)
            pass

    def clear_cache(self, media: Optional[Media] = None):
        """清理缓存:单个/全部"""
        if media:
            os.remove(self.get_cache_path(media))
        else:
            shutil.rmtree(self.cache_dir)
            os.makedirs(self.cache_dir, exist_ok=True)

6.2 Bangumi 进度同步

import aiohttp
from model import Episode
from pydantic import BaseModel

class BangumiSyncManager:
    def __init__(self, bangumi_token: str):
        self._token = bangumi_token
        self._base_url = "https://api.bangumi.tv/v0"
        self._session = aiohttp.ClientSession(
            headers={"Authorization": f"Bearer {self._token}"}
        )

    async def sync_progress(self, episode: Episode, progress: float, duration: float):
        """同步播放进度到Bangumi(进度/时长为秒)"""
        if not episode.bangumi_id:
            return
        try:
            await self._session.post(
                url=f"{self._base_url}/episodes/{episode.bangumi_id}/progress",
                json={
                    "progress": progress,
                    "total": duration,
                    "status": "finish" if progress >= duration - 10 else "watching"
                }
            )
        except Exception as e:
            print(f"Bangumi进度同步失败:{str(e)}")

    async def close(self):
        await self._session.close()

部署与扩展建议

7.1 环境安装(关键:libtorrent)

# 基础依赖安装
pip install aiohttp pydantic python-mpv libtorrent-python aiofiles filelock ping3 lxml beautifulsoup4

# 可选UI依赖
pip install PyQt6 streamlit

# 注意:libtorrent-python安装失败时,用conda安装
conda install -c conda-forge libtorrent-python

7.2 功能扩展方向

  • 新增播放源:只需实现MediaDataSource抽象接口,无需修改上层代码;
  • 跨平台UI:基于PyQt6封装播放器UI,调用PlayerManager通用方法实现交互;
  • 弹幕支持:整合danmaku2ass将弹幕转为ASS格式,通过mpv加载弹幕;
  • 反爬优化:添加rotating-proxies代理池、fake-useragent随机UA;
  • 进度持久化:用sqlite3存储播放进度,启动时自动恢复上次播放位置;
  • 画质自动切换:根据网络速度自动切换低/高画质播放源。

7.3 性能优化点

  • BT边播优化:设置libtorrent缓存大小session.set_settings({"cache_size": 1024*10}),提升本地流速度;
  • 并行解析限流:用asyncio.Semaphore限制协程数量,避免请求过多被封;
  • 测速优化:改为多段测速(多次请求取平均值),提升测速准确性;
  • 本地缓存优化:用LRU策略清理过期缓存,避免磁盘占满;
  • 播放器优化:mpv开启硬件解码hwdec="auto",提升播放流畅度。

7.4 Python实现 vs Animeko(Kotlin/KMP)

设计点 Animeko(Kotlin/KMP) Python 实现
统一模型 自定义data class Pydantic BaseModel(带数据校验)
异步处理 Kotlin Coroutines + Flow asyncio 协程 + aiohttp 异步生态
多源解耦 抽象接口 MediaDataSource ABC抽象基类 + 实现类注入
BT边播 anitorrent + 本地虚拟流 libtorrent + aiohttp 本地流服务
播放器 ExoPlayer/VLC(跨平台原生) python-mpv(封装mpv,全平台支持)
并行选源 Coroutines 并行 asyncio.gather 协程并行

总结

Python 可完全复刻 Animeko 核心的资源解析+多源择优+在线播放功能,核心是借鉴其「标准化抽象+接口解耦」的设计思路, 结合 Python 异步生态(asyncio/aiohttp)和成熟的第三方库(libtorrent/python-mpv)实现。
相比 Kotlin/KMP,Python 实现的优势是开发效率高、生态丰富、轻量易部署;劣势是跨平台UI不如Compose成熟, 但通过mpv/libtorrent等C扩展库可弥补性能短板,完全满足个人/小范围使用需求。
上述代码为可运行的核心骨架,只需根据实际动漫源微调解析逻辑(如HTML解析规则、API请求参数),即可快速实现一款专属追番工具。

Animeko 同款功能 Python 实现文档 | 基于 Animeko 核心设计思路复刻

使用 TailwindCSS + Highlight.js 排版