本文介绍了通过AnythingLLM在搭载骁龙平台的个人电脑上购建自己的聊天应用所需要的全部内容。我们描述了环境配置和测试过程,并提供了扩展该应用基础功能的若干建议。
即便人工智能并非您常规关注的领域,掌握AI模型的运用与开发技能仍大有裨益。核心AI模型在分析能力和功能方面持续增长,而市场也在随着新型任务专用AI模型的涌现不断扩展。
了解如何构建、测试和修改AI应用,这些基础知识是确保您保持竞争力的方法。本教程特别适合于利用AI进行开发工作的新手,以及想要通过小型的有趣项目体验和使用简单端侧AI设置的人员。
您所需要的内容
- 硬件:
此项演示基于以下硬件搭建。应用本身设计为与硬件无关;但是,您可以了解到所选择的硬件在性能方面的差异。确保您有足够的内存来支持进行本地推理。AnythingLLM属于超轻量级应用,只需要2GB的内存就可以使用各种基本功能并保存聊天记录。
- 机器:戴尔Latitude 7455笔记本电脑
- 芯片:骁龙X Elite
- 操作系统:Windows 11
- 内存:32GB
- 软件:
- Python版本:3.12.6
- AnythingLLM大型语言模型提供程序:AnythingLLM NPU(适用于比较老的模型,在高通QNN中列明)
AnythingLLM聊天模型:Llama 3.1 8B Chat 8K
- 其他资源:
查看此GitHub代码库,获取更多的资源和代码。
设置
- 安装并配置AnythingLLM。在选择LLM供应商时,请务必选择AnythingLLM NPU选项以启用NPU加速功能。根据硬件条件选择合适的模型:本次演示采用Llama 3.1 8B Chat模型(8K上下文长度),您也可基于硬件限制选择其他模型以获得更佳性能。
- 点击“+ New workspace”,创建一个工作区。
- 生成API密钥。
1. 单击左侧面板底部的设置按钮;
2. 打开“工具”下拉菜单;
3. 点击“开发者API”;
4. 点击“生成新API密钥”;
打开一个PowerShell实例,并复制代码库:
git clone https://github.com/thatrandomfrenchdude/simple-npu-chatbot.git
利用要求文本创建并激活您的虚拟环境:
# 1. navigate to the cloned directory
cd simple-npu-chatbot
# 2. create the python virtual environment
python -m venv llm-venv
# 3. activate the virtual environment
./llm-venv/Scripts/Activate.ps1 # windows
source \llm-venv\bin\activate # mac/linux
# 4. install the requirements
pip install -r requirements.txt
利用以下变量创建您的config.yaml文件
api_key: "your-key-here"
model_server_base_url: "http://localhost:3001/api/v1"
workspace_slug: "your-slug-here"
stream: true
stream_timeout: 60
测试模型服务器认证,验证API密钥
python src/auth.py
使用工作区工具获取您的工作区标识符
1. 在命令行控制台中运行python src/workspace.py;
2. 从输出项中找到您的工作区及其标识符;
3. 将标识符添加到config.yaml的workspace_slug变量中。
构建应用
配置好应用后,建议深入研读代码实现,确保您做好充分准备,以便针对自己的应用对代码进行扩展。如同使用代码一样,总是有很多方法实现同一目标,所以不要把这当作构建聊天机器人应用的唯一方法。
除上述提及的auth.py和workspace.py实用程序之外,该代码还包含了使用终端或Gradio接口与聊天机器人对话的选项。终端部署速度更快且资源占用极低 – 在设备资源严格受限的实验场景中尤为实用、但其用户界面功能有限的情况下,如果您以前没有使用过终端,您可能会发现它有点反直觉。
如果您选择了Gradio,您需要使用一个网页浏览器;这意味着您所使用的系统资源多于通过终端获得的系统资源;但是,您可以享受到更加直观的用户界面。
此外,这两个界面均包括由config.yaml中的流式布尔值设置的阻塞式版本和流式版本。阻塞式版本会在完全处理了响应之后再返回,而流式版本则是在可以在获得数据块时立即将其返回。
此项代码审查将仅涵盖终端界面功能,因其交互逻辑与图形界面具有高度相似性。请注意,与GitHub代码库相比,代码已被简化;类型和注释被删除。
终端聊天机器人
如要开始使用终端版本,您需要安装并导入以下程序库:
import asyncio
import httpx
import json
import requests
import sys
import threading
import time
import yaml
应当将Asyncio、httpx和request程序库用于处理对于模型服务器的异步流请求,将json和yaml程序库分别用于处理请求响应和配置文件,并将sys、threading和time程序库用于处理阻塞响应时的进度条。
loading_indicator函数很简单:该函数每隔半秒在命令行中打印一个句点,最多10个;然后擦除该命令行并重新开始。该函数在模型响应处理期间以线程方式运行,具体代码如下:
def loading_indicator():
while not stop_loading:
for _ in range(10):
sys.stdout.write('.')
sys.stdout.flush()
time.sleep(0.5)
sys.stdout.write('\r' + ' ' * 10 + '\r')
sys.stdout.flush()
print('')
除了最后讨论的调用部分之外,其余代码都存在于聊天机器人类别中。初始化操作很简单:读取配置文件,并分配类别变量,以便在其他函数中引用:
class Chatbot:
def __init__(self):
with open("config.yaml", "r") as file:
config = yaml.safe_load(file)
self.api_key = config["api_key"]
self.base_url = config["model_server_base_url"]
self.stream = config["stream"]
self.stream_timeout = config["stream_timeout"]
self.workspace_slug = config["workspace_slug"]
if self.stream:
self.chat_url = f"{self.base_url}/workspace/{self.workspace_slug}/stream-chat"
else:
self.chat_url = f"{self.base_url}/workspace/{self.workspace_slug}/chat"
self.headers = {
"accept": "application/json",
"Content-Type": "application/json",
"Authorization": "Bearer " + self.api_key
}
请特别注意最后几行代码。此处需要查看用户是否想要流传输,并且请求头只需定义一次即可在每次请求时方便地重复使用。
初始化完成之后处理run函数,可以将该函数视为所有聊天机器人的核心函数:
def run(self):
while True:
user_message = input("You: ")
if user_message.lower() in [
"exit",
"quit",
"bye",
]:
break
print("")
try:
self.streaming_chat(user_message) if self.stream \
else self.blocking_chat(user_message)
except Exception as e:
print("Error! Check the model is correctly loaded. More details in README troubleshooting section.")
sys.exit(f"Error details: {e}")
在每次循环中,该函数首先向用户请求输入。通过检查输入内容,判断用户是否希望通过预设的特殊关键词退出聊天。最后,程序会生成响应。如此周而复始,循环不息。
可以通过添加额外关键字、或在用户输入和聊天机器人响应之间插入额外代码,可以实现自定义的硬编码功能。代理是聊天机器人的一种专业化实现方式,具备独立执行操作的能力,通常会在响应用户前进行大量处理。如要了解简单的代理实现方式,请查看本GitHub代码库。
现在,让我们深入了解阻塞式聊天功能。
前四行代码用于处理加载进度条的线程。此处引用了之前提到的全局变量stop_loading,将其设置为False以启动加载过程,随后通过线程启动相应函数。代码如下所示:
global stop_loading
stop_loading = False
loading_thread = threading.Thread(target=loading_indicator)
loading_thread.start()
接下来,定义data和chat_response变量。dada包含一个json字典,在发出请求时将其传递给API。该字典中的sessionId键并非AnythingLLM所需;开发者如需跟踪会话可保留该字段。由于AnythingLLM支持在聊天完成请求中接收并解析附件,因此字典中还包含attachments键。chat_response则包含使用Python标准库requests发起请求后返回的结果。
data = {
"message": message,
"mode": "chat",
"sessionId": "example-session-id",
"attachments": []
}
chat_response = requests.post(
self.chat_url,
headers=self.headers,
json=data
)
该函数的最后一部分处理代理的响应。首先,触发stop_loading变量,并将加载进度条线程重新加入到主线程。然后,该函数在检查错误时会尝试打印信息。与run函数一样,可以使用自定义功能对该函数进行扩展。例如,尽管AnythingLLM已经跟踪了聊天历史记录的上下文,但您可能想要在应用中单独对其进行跟踪,以获取每项请求的额外上下文信息。
stop_loading = True
loading_thread.join()
try:
print("Agent: ", end="")
print(chat_response.json()['textResponse'])
print("")
except ValueError:
return "Response is not valid JSON"
except Exception as e:
return f"Chat request failed. Error: {e}"
总的来说,blocking_chat函数如下所示,并将所要发送的信息作为自变量:
def blocking_chat(self, message):
global stop_loading
stop_loading = False
loading_thread = threading.Thread(target=loading_indicator)
loading_thread.start()
data = {
"message": message,
"mode": "chat",
"sessionId": "example-session-id",
"attachments": []
}
chat_response = requests.post(
self.chat_url,
headers=self.headers,
json=data
)
stop_loading = True
loading_thread.join()
try:
print("Agent: ", end="")
print(chat_response.json()['textResponse'])
print("")
except ValueError:
return "Response is not valid JSON"
except Exception as e:
return f"Chat request failed. Error: {e}"
现在让我们来看看流传输的对比情况。流传输由两个函数组成,一个是异步包装器函数streaming_chat,另一个是用于在数据块到达时对其进行处理的streaming_chat_async函数。包装器是一种使用asyncio处理信息的简单函数。该函数是定义行之外的单行代码,具体函数如下:
def streaming_chat(self, message):
asyncio.run(self.streaming_chat_async(message))
包装器调用的主函数以两个变量定义开始。数据变量保持不变,其下方的缓冲区用于在数据块到达时对其进行收集。
data = {
"message": message,
"mode": "chat",
"sessionId": "example-session-id",
"attachments": []
}
buffer = ""
data变量保持不变,其下方的缓冲区用于在数据块到达时对其进行收集。后续代码段较长,因此拆分为若干小段:首先是嵌套上下文和循环结构的实现部分:
try:
async with httpx.AsyncClient(timeout=self.stream_timeout) as client:
async with client.stream("POST", self.chat_url, headers=self.headers, json=data) as response:
print("Agent: ", end="")
async for chunk in response.aiter_text():
if chunk:
buffer += chunk
第一个上下文管理器用于配置处理数据流的异步HTTP客户端,随后第二个上下文管理器利用该客户端向模型端点发起携带消息的实际POST请求。错误检查(未包括在此处)可捕获httpx客户端的任何错误。这些上下文后面的print语句则打印聊天机器人响应所附加的初始标签。
紧随其后的几行代码则处理来自响应的数据块。与请求调用方式类似,这些数据块通过异步迭代实现——当数据块尚未就绪时,执行权会释放给其他线程。一旦接收到数据块,它们就会被存入缓冲区。
接下来的几行代码用于开始处理缓冲区中的数据:
while "\n" in buffer:
line, buffer = buffer.split("\n", 1)
if line.startswith("data: "):
line = line[len("data: "):]
只要缓冲区中仍然有完整代码行,就会进入缓冲区处理循环,将数据块输出到屏幕上。如果没有检测到完整代码行(在本例中使用换行符),则退出循环并等待更多数据块。
为处理单行数据,系统会从缓冲区提取该行内容,并更新缓冲区以移除正在处理的行。从这一步开始,对代码行进行清理。必须进行该项操作,因为AnythingLLM返回一个json字典,因此将键删除。
接下来,对提取的代码行进行处理:
try:
parsed_chunk = json.loads(line.strip())
print(parsed_chunk.get("textResponse", ""), end="", flush=True)
if parsed_chunk.get("close", False):
print("")
except json.JSONDecodeError:
# The line is not a complete JSON; wait for more data.
continue
except Exception as e:
# generic error handling, quit for debug
print(f"Error processing chunk: {e}")
sys.exit()
在try代码块中,解析后的数据块会以JSON格式加载并打印到控制台。最后部分会检查这是否为最终信息。如果是最终信息,则输出一个新代码行,以分隔用户输入和聊天机器人输出。某些轻微的错误处理 – 针对json解码和通用异常捕获 – 可处理任何意外问题。
总的来说,streaming_chat函数如下所示,并将所要发送的信息作为一个自变量:
def streaming_chat(self, message):
asyncio.run(self.streaming_chat_async(message))
async def streaming_chat_async(self, message):
data = {
"message": message,
"mode": "chat",
"sessionId": "example-session-id",
"attachments": []
}
buffer = ""
try:
async with httpx.AsyncClient(timeout=self.stream_timeout) as client:
async with client.stream("POST", self.chat_url, headers=self.headers, json=data) as response:
print("Agent: ", end="")
async for chunk in response.aiter_text():
if chunk:
buffer += chunk
while "\n" in buffer:
line, buffer = buffer.split("\n", 1)
if line.startswith("data: "):
line = line[len("data: "):]
try:
parsed_chunk = json.loads(line.strip())
print(parsed_chunk.get("textResponse", ""), end="", flush=True)
if parsed_chunk.get("close", False):
print("")
except json.JSONDecodeError:
# The line is not a complete JSON; wait for more data.
continue
except Exception as e:
# generic error handling, quit for debug
print(f"Error processing chunk: {e}")
sys.exit()
except httpx.RequestError as e:
print(f"Streaming chat request failed. Error: {e}")
最后,将聊天机器人实例化,并通过主函数调用聊天机器人:
if __name__ == '__main__':
stop_loading = False
chatbot = Chatbot()
chatbot.run()
如果将文件作为主文件调用,则该代码块的第一行会告知Python解释器运行该代码。第二行设置一个全局变量,用于告知加载指示器线程是否应该显示加载条。最后两行会对聊天机器人类别进行初始化并运行。
如要获得更多全面分步指导,请参看从零开始构建聊天机器人:点击此处观看30分钟创意挑战构建视频。
测试您的聊天应用
如前一节所述,您可以选择使用终端或Gradio聊天界面与机器人对话。完成设置后,运行您从命令行中选择的应用:
# terminal
python src/terminal_chatbot.py
# gradio
python src/gradio_chatbot.py
常见问题处理
AnythingLLM NPU运行时缺失
在搭载骁龙X Elite平台的机器上,AnythingLLM应默认使用NPU作为大语言模型提供器。如果您在下拉列表中没有发现AnythingLLM NPU,说明您已下载了x64版本的AnythingLLM。请删除该应用,并安装ARM64版本。
未下载模型
有时候,所选模型会下载失败,导致生成错误。要解决这一问题,请在AnythingLLM中的“设置 -> AI提供程序 -> 大型语言模型”查看模型。如果正确安装了模型,您应当在模型卡上看到“卸载”。如果您看到“模型需要下载”,则请选择另一个模型,点击保存,切换回来,然后保存。您应该在AnythingLLM窗口的右上角看到模型下载。
可选附加组件:创建加载条
如要在AnythingLLM中实现用于推理任务的多线程加载进度条,您需要使用Python的线程和队列模型。
1. 核心组件
- 主线程:负责处理UI更新和进度条渲染
- 工作线程:负责执行推理任务
- 进程队列:线程之间的线程安全通信通道
import threading
import queue
import time
from alive_progress import alive_bar # Optional for advanced animations
2. 线程安全进度追踪
def inference_task(progress_queue, query):
# Simulate inference processing
steps = ["Tokenizing", "Processing", "Generating"]
for i, step in enumerate(steps, 1):
time.sleep(1) # Replace with actual inference work
progress_queue.put((i/len(steps)*100, step))
progress_queue.put((100, "Complete"))
3. 加载条线程
def loading_bar(progress_queue):
with alive_bar(100, title='Processing Query') as bar:
while True:
try:
progress, status = progress_queue.get(timeout=0.1)
bar.title(f'[{status}]')
bar(progress - bar.current) # Increment by delta
if progress >= 100:
break
except queue.Empty:
continue
4. AnythingLLM集成
def run_inference_with_progress(query):
progress_queue = queue.Queue()
# Start inference thread
inference_thread = threading.Thread(
target=inference_task,
args=(progress_queue, query)
)
inference_thread.start()
# Start progress bar in main thread
loading_bar(progress_queue)
inference_thread.join()
print("\nInference complete")
实现注意事项
线程同步
- 使用queue.Queue进行安全的线程间通信
- 主线程每100毫秒轮询队列进行更新
- Worker发送完成百分比和状态信息
用户界面集成
# For web UI integration (Flask example)
@app.route('/chat', methods=['POST'])
def chat():
query = request.json['query']
thread = threading.Thread(target=run_inference_with_progress, args=(query,))
thread.start()
return jsonify({"status": "Processing started"})
高级功能
- 将alive-progress用于动态加载动画和吞吐量统计
- 添加预计完成时间计算:
bar.title(f'ETA: {bar.eta}s | {status}')
架构图
[User Input]
│
▼
[Main Thread] ─── starts ───▶ [Worker Thread]
│ ▲ │
│ └── progress updates ───┘
▼
[Loading Bar Render]
│
▼
[LLM Response]
该模式在显示实时推理进程的同时保持用户界面的响应性。
是否需要获得这一基本项目的其他扩展?请尝试使用文本流进行异步通信!
额外资源
您是编程新手吗?欢迎光临!学习如何编码有助于您获得可以在诸多不同领域转移的有用技能。
通过编码,您可以学会如何将问题分解为较为简单的任务,以及如何识别这些任务中重复出现的元素 – 通常都可以对其进行自动化处理,或至少您可以重复使用所编写的代码来执行该任务,而不是每次都要从头开始。
有大量适合于初学者的优质编程课程,您可能会觉得这些课程很有帮助:
- LearnPython.org提供了免费教程,包括初学者选项和高级选项。
- 如果您在解决特定问题方面的学习效果更好,您可能会喜欢Udemy针对具体项目的课程。这些课程并非免费提供,但Udemy经常有促销活动,您通常都可以获得优惠代码。
- 如果您是一位零基础的初学者,通过Free Code Camp的免费Python教程,您可以在大约4.5小时的时间内掌握基础内容并开始实践。
我们非常期待看到您构建的应用!请加入高通开发者Discord社区,向大家展示您的项目成果!
常见问题解答
问:什么是端侧AI?为什么要使用骁龙NPU?
答:端侧AI是指直接在本地设备(例如:搭载骁龙平台的硬件)上运行机器学习模型,减少延迟,最大限度地减少数据传输,并提高隐私性。
骁龙NPU加快了本地推理速度,从而确保聊天和AI应用在不依赖云的情况下运行得更快、响应得更灵敏。
问:什么是AnythingLLM?它是如何与骁龙平台集成的?
答:AnythingLLM是一种轻量级、支持多供应商的大语言模型运行框架。在本文中,我们以“高通QNN”为例展示NPU加速方案。这样可确保使用骁龙加速器硬件的模型(例如:Llama 3.1 8B)能够进行本地推理。
问:如何在推理过程中提高用户界面响应能力?
答:利用工作线程实现异步推理,并通过queue.Queue向主线程传达进程信息。利用alive-progress等工具,在NPU在后台处理数据的同时,您可以渲染加载条(或Gradio用户界面I),从而保持界面流畅。
在所发布内容中表达的观点仅为原作者的个人观点,并不代表高通技术公司或其子公司(以下简称为“高通技术公司”)的观点。所提供的内容仅供参考之用,而并不意味着高通技术公司或任何其他方的赞同或表述。本网站同样可以提供非高通技术公司网站和资源的链接或参考。高通技术公司对于可能通过本网站引用、访问、或链接的任何非高通技术公司网站或第三方资源并没有做出任何类型的任何声明、保证、或其他承诺。
骁龙与高通品牌产品均为高通技术公司和/或其子公司的产品。
关于作者
尼克·德布尔
高级机器学习工程师兼AI开发者倡导者
