Ubuntu部署ktransformers

news/2025/2/27 2:41:16

准备工作

一台服务器

CPU:500G

GPU:48G(NVIDIA4090)

系统:Ubuntu20.04(github的文档好像用的是22.04)

第一步:下载权重文件

1.下载hfd

wget https://hf-mirror.com/hfd/hfd.sh
chmod a+x hfd.sh

2.设置环境变量

export HF_ENDPOINT=https://hf-mirror.com

3.下载模型(需要梯子,需要带上huggingface的token)

./hfd.sh gpt2

4.下载数据集(需要梯子,需要带上huggingface的token)

./hfd.sh wikitext --dataset

5.下载大文件(需要梯子,文件很大,大约四五百G)

./hfd.sh unsloth/DeepSeek-R1-GGUF --include DeepSeek-R1-Q4_K_M/*

第二步:拉代码,编译代码

1.使用Anaconda3安装Python3.11

conda create --name ktransformers python=3.11
conda activate ktransformers 
conda install -c conda-forge libstdcxx-ng

2.安装其他依赖

pip3 install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu126
pip3 install packaging ninja cpufeature numpy
sudo add-apt-repository ppa: ubuntu-toolchain-r/test
sudo apt-get update
sudo apt-get install --only-upgrade libstdc++6
pip install flash-attn --no-build-isolation

3.查看显卡版本及cuda版本

以下两条指令显示的CUDA版本需要一致,若不一致,系统会以nvcc --version的为准

nvcc --version
nvidia-smi

4.拉代码

git clone https://github.com/kvcache-ai/ktransformers.git

cd ktransformers

git submodule init

git submodule update

5.编译

export USE_NUMA=1
make dev_install

第三部:运行

python ktransformers/local_chat.py --model_path deepseek-ai/DeepSeek-R1 --gguf_path /home/dpkj/deepseek/DeepSeek-R1-GGUF/DeepSeek-R1-Q4_K_M/ --cpu_infer 50 --cache_lens 1536 --max_new_tokens 8192

# --model_path:模型位置,不需要修改
# --gguf_path:前面下载的大文件,模型文件位置,按照实际情况而定
# --cpu_infer:CPU占用,单位百分比,如果服务器不死DDR5双路CPU,可以适量调低此占比

其他启动参数

python -m transformers.local_chat --model_path deepseek-ai/DeepSeek-R1 --gguf_path /root/DeepSeek-R1-GGUF/DeepSeek-R1-Q4_K_M/ --cpu_infer 53 --cache_lens 1536

python ./transformers/local_chat.py --model_path deepseek-ai/DeepSeek-R1 --gguf_path /home/shadeform/DeepSeek-R1-GGUF/DeepSeek-R1-Q4 K M/ --cpu_infer 53 --cache_lens 1536 --optimize_config_path transformers/optimize/optimize_rules/DeepSeek-V3-Chat-multi-gpu-marlin.yaml

python -m transformers.local_chat --model_path deepseek-ai/DeepSeek-R1 --gguf_path /root/autodi-tmp/DeepSeek-R1-GGUF/DeepSeek-R1-Q4 K M/ --cpu_infer 128 --cache_lens 1536 --max_new_tokens 8192 --optimize_config_path ./transformers/optimize/optimize_rules/DeepSeek-V3-Chat-multi-gpu-marlin-4.yaml

transformers --model_path deepseek-ai/DeepSeek-R1 --gguf_path /root/autodi-tmp/DeepSeek-R1-GGUF/DeepSeek-R1-Q4 K M/ --cpu_infer 65 --cache_lens 1536 --max_new_tokens 8192 --port 6006 --optimize_config_path /transformers/optimize/optimize_rules/DeepSeek-V3-Chat-multi-gpu-marlin-4.yaml

curl -X 'POST"
    "http://localhost:6006/v1/chat/completions'\
    -H 'accept: application/json' \
    -H 'Content-Type: application/json' \
    -d'{
        "messages": [
        "content": "tell a joke",
        "role": "user"
    ],
    "model": "ktranformers-model",
    "stream": true
}'

外传

1. 使用API方式调用

新建文件:chat_openai.py

import argparse
import uvicorn
from typing import List, Dict, Optional, Any
from fastapi import FastAPI, HTTPException, status
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
import os
import sys
import time
from fastapi import Request
from fastapi.responses import StreamingResponse, JSONResponse
import json
import logging

# 设置日志记录
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

project_dir = os.path.dirname(os.path.dirname(__file__))
sys.path.insert(0, project_dir)
import torch
from transformers import (
    AutoTokenizer,
    AutoConfig,
    AutoModelForCausalLM,
    GenerationConfig,
    TextStreamer,
)
from ktransformers.optimize.optimize import optimize_and_load_gguf
from ktransformers.models.modeling_deepseek import DeepseekV2ForCausalLM
from ktransformers.models.modeling_qwen2_moe import Qwen2MoeForCausalLM
from ktransformers.models.modeling_deepseek_v3 import DeepseekV3ForCausalLM
from ktransformers.models.modeling_llama import LlamaForCausalLM
from ktransformers.models.modeling_mixtral import MixtralForCausalLM
from ktransformers.util.utils import prefill_and_generate
from ktransformers.server.config.config import Config

custom_models = {
    "DeepseekV2ForCausalLM": DeepseekV2ForCausalLM,
    "DeepseekV3ForCausalLM": DeepseekV3ForCausalLM,
    "Qwen2MoeForCausalLM": Qwen2MoeForCausalLM,
    "LlamaForCausalLM": LlamaForCausalLM,
    "MixtralForCausalLM": MixtralForCausalLM,
}

ktransformer_rules_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "optimize", "optimize_rules")
default_optimize_rules = {
    "DeepseekV2ForCausalLM": os.path.join(ktransformer_rules_dir, "DeepSeek-V2-Chat.yaml"),
    "DeepseekV3ForCausalLM": os.path.join(ktransformer_rules_dir, "DeepSeek-V3-Chat.yaml"),
    "Qwen2MoeForCausalLM": os.path.join(ktransformer_rules_dir, "Qwen2-57B-A14B-Instruct.yaml"),
    "LlamaForCausalLM": os.path.join(ktransformer_rules_dir, "Internlm2_5-7b-Chat-1m.yaml"),
    "MixtralForCausalLM": os.path.join(ktransformer_rules_dir, "Mixtral.yaml"),
}

# 全局变量,存储初始化后的模型
chat_model = None

class OpenAIChat:
    def __init__(
        self,
        model_path: str,
        optimize_rule_path: str = None,
        gguf_path: str = None,
        cpu_infer: int = Config().cpu_infer,
        use_cuda_graph: bool = True,
        mode: str = "normal",
    ):
        torch.set_grad_enabled(False)
        Config().cpu_infer = cpu_infer

        self.tokenizer = AutoTokenizer.from_pretrained(model_path, trust_remote_code=True)
        config = AutoConfig.from_pretrained(model_path, trust_remote_code=True)
        self.streamer = TextStreamer(self.tokenizer, skip_prompt=True) if not Config().cpu_infer else None
        if mode == 'long_context':
            assert config.architectures[0] == "LlamaForCausalLM", "Only LlamaForCausalLM supports long_context mode"
            torch.set_default_dtype(torch.float16)
        else:
            torch.set_default_dtype(config.torch_dtype)

        with torch.device("meta"):
            if config.architectures[0] in custom_models:
                if "Qwen2Moe" in config.architectures[0]:
                    config._attn_implementation = "flash_attention_2"
                if "Llama" in config.architectures[0]:
                    config._attn_implementation = "eager"
                if "Mixtral" in config.architectures[0]:
                    config._attn_implementation = "flash_attention_2"
                model = custom_models[config.architectures[0]](config)
            else:
                model = AutoModelForCausalLM.from_config(
                    config, trust_remote_code=True, attn_implementation="flash_attention_2"
                )

        if optimize_rule_path is None:
            if config.architectures[0] in default_optimize_rules:
                optimize_rule_path = default_optimize_rules[config.architectures[0]]

        optimize_and_load_gguf(model, optimize_rule_path, gguf_path, config)
        
        try:
            model.generation_config = GenerationConfig.from_pretrained(model_path)
        except:
            model.generation_config = GenerationConfig(
                max_length=128,
                temperature=0.7,
                top_p=0.9,
                do_sample=True
            )
        
        if model.generation_config.pad_token_id is None:
            model.generation_config.pad_token_id = model.generation_config.eos_token_id
        
        model.eval()
        self.model = model
        self.use_cuda_graph = use_cuda_graph
        self.mode = mode
        logger.info("Model loaded successfully!")

    def create_chat_completion(
        self,
        messages: List[Dict[str, str]],
        temperature: float = 0.7,
        max_tokens: int = 1000,
        top_p: float = 0.9,
        force_think: bool = False,
    ) -> Dict:
        input_tensor = self.tokenizer.apply_chat_template(
            messages, add_generation_prompt=True, return_tensors="pt"
        )
        
        if force_think:
            token_thinks = torch.tensor([self.tokenizer.encode("<think>\\n", add_special_tokens=False)],
                                        device=input_tensor.device)
            input_tensor = torch.cat([input_tensor, token_thinks], dim=1)

        generation_config = GenerationConfig(
            temperature=temperature,
            top_p=top_p,
            max_new_tokens=max_tokens,
            do_sample=True  # Ensure do_sample is True if using temperature or top_p
        )

        generated = prefill_and_generate(
            self.model,
            self.tokenizer,
            input_tensor.cuda(),
            max_tokens,
            self.use_cuda_graph,
            self.mode,
            force_think
        )

        # Convert token IDs to text
        generated_text = self.tokenizer.decode(generated, skip_special_tokens=True)

        return {
            "choices": [{
                "message": {
                    "role": "assistant",
                    "content": generated_text
                }
            }],
            "usage": {
                "prompt_tokens": input_tensor.shape[1],
                "completion_tokens": len(generated),
                "total_tokens": input_tensor.shape[1] + len(generated)
            }
        }

class ChatMessage(BaseModel):
    role: str
    content: str

class ChatCompletionRequest(BaseModel):
    messages: List[ChatMessage]  # 确保 messages 是 Pydantic 模型实例的列表
    model: str = "default-model"
    temperature: Optional[float] = 0.7
    top_p: Optional[float] = 0.9
    max_tokens: Optional[int] = 1000
    stream: Optional[bool] = False
    force_think: Optional[bool] = True

class ChatCompletionResponse(BaseModel):
    id: str = "chatcmpl-default"
    object: str = "chat.completion"
    created: int = 0
    model: str = "default-model"
    choices: List[Dict[str, Any]]
    usage: Dict[str, int]

app = FastAPI(title="KVCache.AI API Server")

@app.get("/health")
async def health_check():
    return {"status": "healthy"}

@app.middleware("http")
async def add_process_time_header(request: Request, call_next):
    start_time = time.time()
    response = await call_next(request)
    process_time = time.time() - start_time
    response.headers["X-Process-Time"] = f"{process_time:.4f}s"
    return response

app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

@app.post("/v1/chat/completions", response_model=ChatCompletionResponse)
async def chat_completion(request: ChatCompletionRequest):
    try:
        # 如果 messages 是 Pydantic 模型实例列表,使用 model_dump
        messages = [m.model_dump() for m in request.messages]
        response = chat_model.create_chat_completion(
            messages=messages,
            temperature=request.temperature,
            max_tokens=request.max_tokens,
            top_p=request.top_p,
            force_think=request.force_think
        )

        return {
            "id": f"chatcmpl-{int(time.time())}",
            "object": "chat.completion",
            "created": int(time.time()),
            "model": request.model,
            "choices": [{
                "index": 0,
                "message": {
                    "role": "assistant",
                    "content": response['choices'][0]['message']['content']
                },
                "finish_reason": "stop"
            }],
            "usage": response['usage']
        }
    except Exception as e:
        logger.error(f"API Error: {str(e)}")
        raise HTTPException(
            status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
            detail=f"Internal server error: {str(e)}"
        )

def create_app(model_path: str, gguf_path: str, cpu_infer:int, optimize_rule_path: Optional[str] = None):
    global chat_model
    chat_model = OpenAIChat(
        model_path=model_path,
        gguf_path=gguf_path,
        optimize_rule_path=optimize_rule_path,
        cpu_infer=cpu_infer
    )
    return app

def main():
    parser = argparse.ArgumentParser(description="KVCache.AI API Server")
    parser.add_argument("--model_path", type=str, required=True, help="HuggingFace模型路径")
    parser.add_argument("--gguf_path", type=str, required=True, help="GGUF模型文件路径")
    parser.add_argument("--optimize_rule_path", type=str, help="优化规则文件路径")
    parser.add_argument("--port", type=int, default=8000, help="服务端口号")
    parser.add_argument("--cpu_infer", type=int, default=10, help="使用cpu数量")
    parser.add_argument("--host", type=str, default="0.0.0.0", help="绑定地址")
    args = parser.parse_args()

    create_app(
        model_path=args.model_path,
        gguf_path=args.gguf_path,
        optimize_rule_path=args.optimize_rule_path,
        cpu_infer=args.cpu_infer
    )

    uvicorn.run(
        app,
        host=args.host,
        port=args.port,
        loop="uvloop",
        http="httptools",
        timeout_keep_alive=300,
        log_level="info",
        access_log=False
    )

if __name__ == "__main__":
    main()

文件防止位置:

安装依赖:

pip install protobuf uvicorn httptools
pip install uvloop

启动:

python ktransformers/chat_openai.py --model_path deepseek-ai/DeepSeek-R1 --gguf_path /home/dpkj/deepseek/DeepSeek-R1-GGUF/DeepSeek-R1-Q4_K_M/

2.使用open-WEBUI进行可视化对接

# 使用Pip下载OPEN-WEBUI

pip install open-webui
# 下载完成后开启服务
open-webui serve
#启动成功如下
在OPEN-WebUI

import os
import json
import requests
from pydantic import BaseModel, Field
from typing import List, Union, Iterator

# Set DEBUG to True to enable detailed logging
DEBUG = False


class Pipe:
    class Valves(BaseModel):
        openai_API_KEY: str = Field(default="none")  # Optional API key if needed
        DEFAULT_MODEL: str = Field(default="DeepSeek-R1")  # Default model identifier

    def __init__(self):
        self.id = "DeepSeek-R1"
        self.type = "manifold"
        self.name = "KT: "
        self.valves = self.Valves(
            **{
                "openai_API_KEY": os.getenv("openai_API_KEY", "none"),
                "DEFAULT_MODEL": os.getenv("openai_DEFAULT_MODEL", "DeepSeek-R1"),
            }
        )
        # Self-hosted FastAPI server details
        self.api_url = (
            "http://localhost:8000/v1/chat/completions"  # FastAPI server endpoint
        )
        self.headers = {"Content-Type": "application/json"}

    def get_openai_models(self):
        """Return available models - for openai we'll return a fixed list"""
        return [{"id": "KT", "name": "DeepSeek-R1"}]

    def pipes(self) -> List[dict]:
        return self.get_openai_models()

    def pipe(self, body: dict) -> Union[str, Iterator[str]]:
        try:
            # Use default model ID since OpenAI has a single endpoint
            model_id = self.valves.DEFAULT_MODEL
            messages = []

            # Process messages including system, user, and assistant messages
            for message in body["messages"]:
                if isinstance(message.get("content"), list):
                    # For OpenAI, we'll join multiple content parts into a single text
                    text_parts = []
                    for content in message["content"]:
                        if content["type"] == "text":
                            text_parts.append(content["text"])
                        elif content["type"] == "image_url":
                            # OpenAI might not support image inputs - add a note about the image
                            text_parts.append(f"[Image: {content['image_url']['url']}]")
                    messages.append(
                        {"role": message["role"], "content": "".join(text_parts)}
                    )
                else:
                    # Handle simple text messages
                    messages.append(
                        {"role": message["role"], "content": message["content"]}
                    )

            if DEBUG:
                print("FastAPI API request:")
                print(" Model:", model_id)
                print(" Messages:", json.dumps(messages, indent=2))

            # Prepare the API call parameters
            payload = {
                "model": model_id,
                "messages": messages,
                "temperature": body.get("temperature", 0.7),
                "top_p": body.get("top_p", 0.9),
                "max_tokens": body.get("max_tokens", 8192),
                "stream": body.get("stream", True),
            }

            # Add stop sequences if provided
            if body.get("stop"):
                payload["stop"] = body["stop"]

            # Sending request to local FastAPI server
            if body.get("stream", False):
                # Streaming response
                def stream_generator():
                    try:
                        response = requests.post(
                            self.api_url,
                            json=payload,
                            headers=self.headers,
                            stream=True,
                        )
                        for line in response.iter_lines():
                            if line:
                                yield line.decode("utf-8")
                    except Exception as e:
                        if DEBUG:
                            print(f"Streaming error: {e}")
                        yield f"Error during streaming: {str(e)}"

                return stream_generator()
            else:
                # Regular response
                response = requests.post(
                    self.api_url, json=payload, headers=self.headers
                )
                if response.status_code == 200:
                    generated_content = (
                        response.json()
                        .get("choices", [{}])[0]
                        .get("message", {})
                        .get("content", "")
                    )
                    return generated_content
                else:
                    return f"Error: {response.status_code}, {response.text}"
        except Exception as e:
            if DEBUG:
                print(f"Error in pipe method: {e}")
            return f"Error: {e}"

    def health_check(self) -> bool:
        """Check if the OpenAI API (local FastAPI service) is accessible"""
        try:
            # Simple health check with a basic prompt
            response = requests.post(
                self.api_url,
                json={
                    "model": self.valves.DEFAULT_MODEL,
                    "messages": [{"role": "user", "content": "Hello"}],
                    "max_tokens": 5,
                },
                headers=self.headers,
            )
            return response.status_code == 200
        except Exception as e:
            if DEBUG:
                print(f"Health check failed: {e}")
            return False

完~
<script src="chrome-extension://bincmiainjofjnhchmcalkanjebghoen/aiscripts/script-main.js"></script>

http://www.niftyadmin.cn/n/5869399.html

相关文章

FFmpeg.NET:.NET 平台上的音视频处理利器

FFmpeg.NET 是一个封装了 FFmpeg 功能的 .NET 库&#xff0c;能够方便地在 C# 项目中处理音视频文件。它支持多种操作&#xff0c;包括转码、剪辑、合并、分离音频等。 功能 解析元数据从视频生成缩略图使用以下参数将音频和视频转码为其他格式&#xff1a; 码率&#xff08;…

微信小程序网络请求与API调用:实现数据交互

在前几篇文章中,我们学习了微信小程序的基础知识、数据绑定、事件处理以及页面导航与路由。这些知识帮助我们构建了具备基本交互功能的小程序。然而,一个完整的应用通常需要与服务器进行数据交互,例如获取用户信息、提交表单数据等。本文将深入探讨微信小程序的网络请求与AP…

计算机网络之路由协议(OSPF路由协议)

一、定义与分类 OSPF是一种内部网关协议&#xff08;IGP&#xff09;&#xff0c;也属于链路状态路由协议。它使用链路状态路由算法&#xff0c;在单一自治系统&#xff08;AS&#xff09;内部工作。适用于IPv4的OSPFv2协议定义于RFC2328&#xff0c;而RFC5340则定义了适用于I…

基于Qlearning强化学习的2DoF机械臂运动控制系统matlab仿真

目录 1.算法仿真效果 2.算法涉及理论知识概要 2.1 2DoF机械臂运动学模型 2.2 Q-learning强化学习算法原理 3.MATLAB核心程序 4.完整算法代码文件获得 1.算法仿真效果 matlab2022a仿真结果如下&#xff08;完整代码运行后无水印&#xff09;&#xff1a; 仿真操作步骤可参…

深入解析 Linux /etc/skel 目录的作用与使用方法

在 Linux 系统中&#xff0c;用户的主目录通常存放着一系列个人配置文件&#xff0c;如 .bashrc、.profile 等&#xff0c;这些文件影响着用户的 Shell 环境、别名、环境变量等。而当我们创建新用户时&#xff0c;这些默认的配置从何而来&#xff1f;这正是 /etc/skel 目录发挥…

国内访问Github的四种方法(2025版)

声明&#xff1a;以下内容&#xff0c;仅供学习使用&#xff0c;不得他用。如有他用&#xff0c;与本文作者无关。 国内访问GitHub及下载文件的解决方案整理如下&#xff0c;结合最新技术方案和实测有效方法&#xff1a; 一、网络层解决方案 Hosts文件修改法 通过DNS查询工具…

海康摄像头 + M7s(Monibuca) + FFmpeg + Python实现多个网络摄像头视频流推流

最近在研究流媒体服务器时&#xff0c;我注意到了一款开源软件——M7s。按照官网的指南部署完成后&#xff0c;我开始进行测试&#xff0c;发现单视频流推送非常顺利&#xff0c;没有任何问题。然而&#xff0c;当我尝试进行多视频流推送时&#xff0c;却发现网上的相关教程寥寥…

利用机器学习实现实时交易欺诈检测

以下是一个基于Python的银行反欺诈AI应用示例代码,演示如何利用机器学习实现实时交易欺诈检测。该示例使用LightGBM算法训练模型,并通过Flask框架构建实时检测API: python import pandas as pd from sklearn.model_selection import train_test_split from sklearn.preproc…