“视界感知者”——基于飞桨大模型的智能视导杖
项目概述
全球视障人士总数达到4200万,其中中国视障人士人数接近1730万,占全球视障群体的40%。目前,导盲杖仍然是视障人士出行时的辅助用具首选项。
现如今,得益于人工智能时代的飞速发展,市面上涌现了大量智能盲杖,其主要分为两类:基于超声探测技术的避障盲杖以及基于图像识别、目标检测的智能盲杖。然而,这些设备仍存在显著缺陷,主要有以下不足:
- 在某些特定场景下效果表现不佳,如超声探测盲杖遇到玻璃、金属、毛绒物会受到限制。
- 现有智能盲杖多数仅能识别出物体类别,难以给出物体的品牌、色彩等信息。
我们项目开发团队在经过大量调研、走访实践后,认识到:视障人士不仅要导航、避障,还需要更多信息,更全面地感知、认知世界——
于是,项目开发团队利用大模型多模态能力,打造了一款大模型辅助视障人士“感知世界”的智能导盲杖产品。作品以百度飞桨大模型为基础,结合目标检测与图像分类模型完成标签提取任务,并提出了一套门控分类器,在不同场景下筛选提示词模版,增强对不同场景的泛用性,最终通过大模型生成描述文本,为视障人士提供直观的场景感知。
作品将软硬件相结合,部署模型到后端服务器,在树莓派、ESP开发板等硬件单元上进行调用,组合实现了相应功能,实际开发完成了实物产品原型,并在响应速度、处理效果上展现出优越的性能。
视界感知者”智能导盲杖,作为项目的核心成果,主要功能如下:
- 通过杖身摄像头拍摄前方图像,调用后端Dify模型工作流对图像进行分析,最终语音播报面前场景物体的描述文本。
- 通过语音交互实现随时与大模型对话效果,可针对面前图像进行详细提问,辅助视障者感知周遭环境的具体细节。
图1. 项目核心流程图
图1.展示了项目的核心工作流程。整体流程如下所示:
【硬件部署】:项目原型机利用树莓派ZERO 2W作为前端处理单元,以Aluminum Heat Sink散热片和NCR18650B可充锂电芯集成了效果良好的供电系统与散热系统。通过嵌入到盲杖上的小型摄像头以及麦克风设备,树莓派可以随时接收并处理摄像头捕获到的场景图片、并将用户的问题提供给后端。
【上传图像、音频】:在获取到场景图片与用户提问音频后,树莓派作为处理单元,可将图像Web传输到服务器云端。
【提取Label与Caption】:这时,云端部署的飞桨社区PP-HGNetV2 图像分类模型及识别模型对图像处理获得Label标签,并在后端通过使用MSDN网络来获取场景信息的Caption描述,输出传入后续工作流。
【分类器处理】:针对获取的标签及Caption描述,项目设计了一套提示词筛选分类器,该门控单元基于标签与Caption输入,对Prompts模版进行选取,从而获得合适的Prompts输入模版,再将其提供给大模型。
【大模型分析描述】:借助大语言模型,将prompts输入模版填充完成后,输入进大模型,从而得到其反馈的细致场景描述文本。最终文本会传输回到树莓派中,通过语音播报的方式传达给用户,为用户带来更细节化的场景感知体验。
我们的作品展示如图:
图2. 产品模型图、实拍图
项目宣传展示详见如下视频:
项目技术方案
硬件部分
在这里详细呈现组建整个盲杖产品使用的硬件配置,可以参考如下配置复现产品
- “视界感知者”一代版本的硬件配置
核心处理单元:“视界感知者”内核基于Raspberry Pi Zero 2 W硬件实现,采用低功耗Wifi通信,用户拍照后即可在线实现目标检测。
Raspberry Pi Zero 2 W小巧轻便、性能强大、功能完善,适合嵌入设备内部集成。且价格适中,能够降低整体产品制造成本。
树莓派服务端算法模块基于 Paddle Detection,可实现毫秒级计算,通信模块基于SpringBoot 和 Netty 异步通信框架,在主流服务 GPU(单块 Tesla P100)算力下整体实现亚秒级响应。
外壳模具:产品原型机采用未来8200 Pro树脂,在保证散热、防水、强度符合市场使用场景的要求下的同时减轻重量,也降低了成本。摄像头选用picamera v2,体积小但功能强大,耐用性好,维护成本低。
散热模块:散热片选用Aluminum Heat Sink for Raspberry Pi Zero 官方提供的铝合金散热片,价格低廉,足够使用,同时会优化外壳设计,考虑通风口和散热孔的布局,提高空气流通效果,有利于散热。
供电模块:选用Panasonic NCR18650B电池,这款电池电量在3500毫安左右,树莓派zero 2W树莓派 Zero 2W 的空闲功耗大约是 0.6 瓦到 1.3 瓦之间,满载功耗在 2.65 瓦左右。不考虑设备发热带来额外功耗的理想情况下,一块容量为 3500 毫安时的电池可以让其运行约 58.33 小时(约2天),在满载状态下,可以运行约13.75小时。
图3. Raspberry Pi Zero 2 W(左),Aluminum Heat Sink for Raspberry Pi(中), ZeroPanasonic NCR18650B(右)外观图
我们按上述硬件配置组装、焊接线材,完成了第一代“视界感知者”的相关设计(PS:也可使用香橙派作为核心处理单元)。
最初的“视界感知者”仍存在多数不足,如内部空间占用过大、扬声器声音过小、接触不稳定、散热难以保证等。
Raspberry Pi Zero 2 W虽然性能较强,但其也有较大的发热问题存在,仅用Aluminum Heat Sink for Raspberry Pi Zero构建的散热模块部分难以保证其有效散热。设备在连续使用时间超过1h至1.5h后,会出现明显的过热导致反应卡顿等问题。
此外,3D打印的模具也存在一些问题,在抓握手感等方面还有显著欠缺。
图4. “视界感知者”一代版本展示图
- “视界感知者”二代版本的硬件配置
针对以上问题,我们升级改良了一代“视界感知者”产品:
核心处理单元:更换ESP开发板作为核心处理单元。可使用ESP32 DEVKIT DOIT 板作为产品组件。ESP32具有双核处理器,内置了 Wi-Fi 和蓝牙,有多个引脚和可用的外设供我们开发配置。我们与AiSpea(北京市赛博创力科技有限公司)联合开发了这款新型ESP开发板。
外壳模具:我们重新建模产品,使用新一版的建模外观,并对把手进行了重新设计。在制作产品时也采用了更柔韧的树脂材质。围绕上一代存在的尾端触地容易磕绊问题,我们换用了球形滚轮式的尾端样式。
相比一代产品,二代“视界感知者”通过上述优化,大幅缩小了产品体积,增设了散热通风口与扬声器播放蜂窝口。此外,我们对充电模块重新设计,采用容量更大的可充锂电池,进一步增强了设备的续航能力。
图5. “视界感知者”二代版本展示图、ESP开发板展示图
软件部分
流程总述
经过实验验证,我们发现,当前大模型辅助盲人感知场景还存在一些较为显著的缺陷:
(1)难以找到适配多个场景的统一提示词模版,大模型在不同场景下的泛化能力较差。比如在与人交流时和在户外马路上时,会有较大的描述效果差异。
(2)难以判断场景中的重点部分,过度关注并描述场景整体。大模型往往偏向于将整张图片的每个部分都描述出来,导致对实际重要的地方关注性较差,细节描述冗余。
因此,我们借助飞桨星河平台的图像分类模型,将每张场景图像的类型作为标签输出,并借助Multi-level Scene Description Network(MSDN)提取图像Caption,将Label与Caption一并作为输入,构造分类器,从而选取适应当前场景图的提示词模版。
最后,将Caption、Label一并送入大模型,使大模型在描述场景前,对场景的主要物体有了提前感知,从而进一步提升其描述效果,减少上述缺陷带来的影响。
整个软件部分的流程结构图如下图所示:
图6. 软件功能流程示意图
图像分类模型
我们使用了百度飞桨AI Studio星河社区的通用图像分类模型。
选取了Top1 Acc(%)表现最优的PP-HGNetV2_B6完成分类任务,并在后端调用获取其输出Label结果,返回到Dify工作流中,Label标签可进一步作为后续输入,辅助大模型整合信息输出场景与物体描述。
具体的调用代码如下所示:
- imgClassify_api.py(调用模型功能):
from flask import Flask, request, jsonify
import os
import cv2
import numpy as np
from infer_class2 import classify_image
app = Flask(__name__)
app.config['JSON_ASCIi'] = False
app.json.ensure_ascii = False
@app.route('/process_image', methods=['POST'])
def process_image():
data = request.get_json()
# if not data or 'base64_image' not in data:
if not data:
return jsonify({"message": "无效的请求,缺少base64_image字段"}), 400
base_64 = data['base64_image']
#k = data['k']
k=2
print(k)
# label, confidence = classify_image(base_64)
# data_dict = {"message": "处理成功", "label": label, "confidence": confidence}
labels_dict = classify_image(base_64,k)
data_dict = {"message": "处理成功", "label": labels_dict}
return jsonify(data_dict)
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000)
- infer.py(模型推理):
import fastdeploy as fd
import cv2
import os
import base64
import numpy as np
import argparse
def classify_image(base64_image):
# 解析参数(这里使用默认的固定参数,可根据需求调整)
manual_args = argparse.Namespace()
manual_args.model = "/home/zhuyifan/PaddleModelDeploy/FastDeploy_Linux_Python_SDK_v1.1.0_x86_64_PP-HGNetV2/model/PPHGNetV2_B6/PPHGNetV2_B6_ssld_infer/"
manual_args.serial_number = None
manual_args.update_license = False
manual_args.topk = 2
manual_args.device = 'cpu'
manual_args.device_id = 0
manual_args.backend = "default"
args = manual_args
# 配置runtime,加载模型
runtime_option = build_option(args)
model_file = os.path.join(args.model, "inference.pdmodel")
params_file = os.path.join(args.model, "inference.pdiparams")
config_file = os.path.join(args.model, "inference.yml")
model = fd.vision.classification.PaddleClasModel(
model_file, params_file, config_file, runtime_option=runtime_option)
# 解码base64图片
if 'base64,' in base64_image:
base64_image = base64_image.split('base64,')[-1]
img_data = base64.b64decode(base64_image)
nparr = np.frombuffer(img_data, np.uint8)
im = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
# image = "/home/zhuyifan/PaddleModelDeploy/FastDeploy_Linux_Python_SDK_v1.1.0_x86_64_PP-HGNetV2/example/test.jpg"
# im = cv2.imread(image)
# 预测图片分类结果
result = model.predict(im, args.topk)
label_ids = result.label_ids
scores = result.scores
print(label_ids, scores)
label_list_path = "/home/zhuyifan/PaddleModelDeploy/FastDeploy_Linux_Python_SDK_v1.1.0_x86_64_PP-HGNetV2/example/imagenet1k_label_list_chinese .txt"
with open(label_list_path, 'r') as f:
labels = [line.strip().split(' ', 1) for line in f.readlines()]
label = None
confidence = None
for label_id, score in zip(label_ids, scores):
for id_str, l in labels:
if int(id_str) == label_id:
label = l
confidence = score
break
if label:
break
return label, confidence
def build_option(args):
option = fd.RuntimeOption()
if args.serial_number:
encryp_file_path = os.path.join(args.model, "encrypt_info.yaml")
option.set_authentication(args.serial_number, encryp_file_path, args.update_license)
if args.device.lower() == "gpu":
option.use_gpu(args.device_id)
if args.backend.lower() == "trt":
assert args.device.lower() == "gpu", "TensorRT backend require inference on device GPU."
option.use_trt_backend()
option.trt_option.set_shape("x", [1, 3, 224, 224], [1, 3, 224, 224], [1, 3, 224, 224])
elif args.backend.lower() == "pptrt":
assert args.device.lower() == "gpu", "Paddle - TensorRT backend require inference on device GPU."
option.use_paddle_infer_backend()
option.paddle_infer_option.enable_trt = True
option.paddle_infer_option.collect_trt_shape = True
option.paddle_infer_option.enable_log_info = False
option.trt_option.set_shape("x", [1, 3, 224, 224], [1, 3, 224, 224], [1, 3, 224, 224])
elif args.backend.lower() == "ort":
option.use_ort_backend()
elif args.backend.lower() == "paddle":
option.use_paddle_infer_backend()
option.paddle_infer_option.enable_log_info = False
elif args.backend.lower() == "openvino":
assert args.device.lower() == "cpu", "OpenVINO backend require inference on device CPU."
option.use_openvino_backend()
elif args.backend.lower() == "pplite":
assert args.device.lower() == "cpu", "Paddle Lite backend require inference on device CPU."
option.use_lite_backend()
return option
base_64 = ""
with open('PaddleModelDeploy/FastDeploy_Linux_Python_SDK_v1.1.0_x86_64_PP-HGNetV2/example/base64_example', 'rb') as f:
binary_content = f.read()
content = binary_content.decode('utf - 8')
base_64 = content
label, confidence = classify_image(base_64)
print(label)
Dify工作流整合
我们使用Dify平台整合各模型处理过程,搭建起整个项目流程工作流。
Dify工作流的示意如下所示(仅作展示demo,分类器构建了厕所、教室两个场景作为示意):
图8. Dify工作流示意图
可以参考如下代码进行Dify工作流的调用:
def difiAPI(input_image,query):
buffered = BytesIO()
input_image.save(buffered, format="PNG")
base64_image = base64.b64encode(buffered.getvalue()).decode('utf-8')
base64_image = 'data:image/jpeg;base64,'+ base64_image
url = "https://api.dify.ai/v1/workflows/run"
headers = {
"Authorization": "Bearer app-0OW05boo9sfPeGNkMynyuhTp",
"Content-Type": "application/json"
}
data = {
"inputs": {"query": query, "base64": str(base64_image)},
"response_mode": "blocking",
"user": "abc-123"
}
response = requests.post(url, headers = headers, json = data)
print(type( response.content))
print(response.content)
string_data = response.content.decode('utf - 8') # 先将字节流解码为字符串
json_data = json.loads(string_data) # 然后进行JSON解析
text_value = json_data['data']['outputs']['text'] # 取出text字段的值
print(text_value)
return text_value
Demo演示
我们在飞桨AI Studio星河社区上线项目,以Gradio为前端搭建了我们功能的演示Demo。
由于无法直接在社区上呈现我们的盲杖产品本身,因此我们实际在星河社区部署的仅仅是盲杖的软件功能演示Demo。具体地,我们通过如下方式搭建了现有的Demo演示:
- 用户能够在我们的demo页面开启摄像头拍摄图像,或直接上传图像文件;
- 用户能够开启设备麦克风进行录音,或直接上传wav音频文件,输入自己想要提问大模型或与其对话的内容;
- 图像文件与音频作为输入,通过Dify进入我们的模型工作流。工作流在后端使用我们部署模型的服务器接口,通过我们设计的分类器等模块,最终经过飞桨文心大模型处理得到最后的场景描述文本。
- 场景描述文本会转为音频文件,您可以听到关于该场景图的语音描述。
具体的Demo演示画面如图所示:
图9. Demo演示界面示意图
注:由于项目在飞桨平台部署后,连接稳定性相比硬件设备可能有限(飞桨平台服务器的限制),有时会存在较长的等待延迟,如果等待最终结果的时间过长,这并非是后端模块处理流程太慢的缘故。
您可以使用如下代码构建我们的Demo演示前端:
import gradio as gr
import requests
from io import BytesIO
from PIL import Image, ExifTags
import json
import base64
import json
import base64
from io import BytesIO
from urllib.parse import urlencode, quote_plus
# python /home/aistudio/Gradio.app.py
title = "盲杖——飞桨演示版"
def vtt(file_path):
url = "https://vop.baidu.com/server_api"
# 读取音频文件
with open(file_path, 'rb') as audio_file:
audio_data = audio_file.read()
# 对音频数据进行Base64编码
base64_audio = base64.b64encode(audio_data).decode('utf-8')
# 定义请求的 payload
payload = {
"format": "wav", # 根据音频格式调整
"rate": 16000, # 根据音频采样率调整
"channel": 1,
"cuid": "xAxXo9S0RTzXRU1INrMioJOi7ta3zGBs",
"token": "24.b0acaa76ec97b7dcb69eda11b0220891.2592000.1730967744.282335-115786479",
"speech": base64_audio,
"len": len(audio_data)
}
headers = {
'Content - Type': 'application/json',
'Accept': 'application/json'
}
response = requests.post(url, headers = headers, json = payload)
#print(type(response.content))
#b'{"corpus_no":"7423668938825248267","err_msg":"success.","err_no":0,"result":["\xe5\xa4\xa7\xe7\x86\x8a\xe7\x8c\xab\xe5\x9c\xa8\xe5\x93\xaa\xef\xbc\x9f"],"sn":"728897213551728457617"}\n
string_data = response.content.decode('utf - 8') # 先将字节流解码为字符串
json_data = json.loads(string_data) # 然后进行JSON解析
text_value = json_data['result'] # 取出text字段的值
#print(text_value)
return text_value
def tts(text):
TOKEN = "24.b46eb2202b81ff94efa5603c3fddaba4.2592000.1731076355.282335-115786479"
TEXT = "欢迎使用百度语音合成。"
# 发音人选择
PER = 4
SPD = 5
PIT = 5
VOL = 5
AUE = 3
CUID = "xAxXo9S0RTzXRU1INrMioJOi7ta3zGBs" # 设置 CUID
TTS_URL = 'http://tsn.baidu.com/text2audio'
# 构造请求参数
tex = quote_plus(text)
params = {
'tok': TOKEN,
'tex': tex,
'per': PER,
'spd': SPD,
'pit': PIT,
'vol': VOL,
'aue': AUE,
'cuid': CUID,
'lan': 'zh',
'ctp': 1
}
data = urlencode(params)
# 发送请求
req = requests.post(TTS_URL, data=data.encode('utf-8'))
# 处理响应
if req.headers['content-type'].find('audio/') < 0:
with open('/home/zhuyifan/tts_error.txt', 'wb') as of:
of.write(req.content)
print("tts api error: " + req.text)
else:
with open("/home/aistudio/result.mp3", 'wb') as of:
of.write(req.content)
return "/home/aistudio/result.mp3"
def difiAPI(input_image,query):
#query = "hello"
# 根据图片格式进行转换(如果是PNG则转换为RGB以便后续压缩为JPEG)
# if input_image.format is not None:
# print(input_image.format)
# try:
# exif = input_image.getexif()
# for tag, value in exif.items():
# if tag in ExifTags.TAGS and ExifTags.TAGS[tag] == 'Format':
# print(value)
# except AttributeError:
# pass
buffered = BytesIO()
input_image.save(buffered, format="PNG")
base64_image = base64.b64encode(buffered.getvalue()).decode('utf-8')
base64_image = 'data:image/jpeg;base64,'+ base64_image
# with open("/home/zhuyifan/PaddleModelDeploy/VisWorld_example/gradio_code/MangZhangPP/1/tmp", 'w') as f:
# f.write(str(base64_image))
url = "https://api.dify.ai/v1/workflows/run"
headers = {
"Authorization": "Bearer app-0OW05boo9sfPeGNkMynyuhTp",
"Content-Type": "application/json"
}
data = {
"inputs": {"query": query, "base64": str(base64_image)},
"response_mode": "blocking",
"user": "abc-123"
}
response = requests.post(url, headers = headers, json = data)
print(type( response.content))
print(response.content)
string_data = response.content.decode('utf - 8') # 先将字节流解码为字符串
json_data = json.loads(string_data) # 然后进行JSON解析
text_value = json_data['data']['outputs']['text'] # 取出text字段的值
print(text_value)
return text_value
def process(input_image, input_audio):
#print(type(input_audio))
query_text = vtt(input_audio)
print(query_text)
output_text = difiAPI(input_image=input_image, query=query_text)
output_audio = tts(output_text)
#print(output_text)
#output_audio = "/home/zhuyifan/PaddleModelDeploy/VisWorld_example/gradio_code/MangZhangPP/我的前面是什么.wav"
return output_text, output_audio
demo_image = "/home/aistudio/test.jpg"
demo_audio = "/home/aistudio/demo_whatinfrontofme.wav"
examples = [
[demo_image, demo_audio]
]
demo = gr.Interface(fn = process,
inputs = [gr.Image(type = "pil"), gr.Audio(type = "filepath")],
outputs = ["text", gr.Audio()],
title = title,examples=examples)
demo.launch(share = True,server_name="0.0.0.0",server_port=8080 )
# python /home/aistudio/work/Gradio.app.py
盲杖功能实现
我们在这里放出可部署在树莓派/香橙派上的盲杖功能构建代码,您可以参照如下代码和上述流程,使用同样的Dify工作流和模型搭建逻辑构建起整个导盲杖并实现对应功能。
我们的模型放在后端服务器处理,通过MQTT协议连接前端与后端服务器。您也可以参照MQTT客户端与服务器对接的处理方式,在服务器上搭建模型处理流程,并与Dify工作流相调用,整合在一起。
具体代码如下:
import time
import wave
import sys
import subprocess
from threading import Event
import threading
import queue
import signal
import numpy as np
import requests
import json
import cv2
import paho.mqtt.client as mqtt
import pvporcupine
import pvrecorder
from pvrecorder import PvRecorder
import os
import base64
import urllib.parse
from queue import Empty
import csv
from concurrent.futures import ThreadPoolExecutor, Future
import pandas as pd
import keyboard
# 设备ID和相关配置
DEV_ID = 'f0:f5:bd:49:f4:2e'
mindb = 6000 # 默认最小声音,小于则结束
waitTime = 2 # 开始时候等待3秒不说话则自动终止
delayTime = 0.5 # 小声1.3秒后自动终止
debounce_time = 1 # 去抖时间
CHUNK = 8192
RECORD_CHANNELS = 1
RECORD_RATE = 16000
WAVE_OUTPUT_FILENAME = "send.wav"
MQTT_SERVER = "150.158.77.109"
MQTT_PORT = 1883
TOPIC = "paho/test/" + DEV_ID
msgId = 0
image_id = ""
take_photo = False
# 初始化摄像头
camera_index = None
for i in range(10):
if cv2.VideoCapture(i).isOpened():
camera_index = i
break
if camera_index is None:
print("未检测到摄像头")
sys.exit(1)
print(f"找到摄像头,使用索引: {camera_index}")
# 初始化录音设备索引
usb_mic_index = None
# 全局变量用于存储扬声器设备
speaker_device = None
# 自动检测并设置 USB 录音设备索引
def detect_usb_mic_index():
print("Detecting USB mic index...")
devices = PvRecorder.get_available_devices()
for index, device in enumerate(devices):
if "Camera" in device: # 检测设备名称中是否包含“Camera”
print(f"USB Camera found at index {index}.")
return index
print("USB Camera not found.")
return None
# 设置 usb_mic_index 为自动检测的结果
usb_mic_index = detect_usb_mic_index()
if usb_mic_index is None:
print("No suitable USB mic found. Exiting.")
else:
print(f"Using USB mic at index {usb_mic_index}.")
# 获取录音设备索引的函数
def detect_usb_mic_index():
print("Detecting USB mic index...")
devices = PvRecorder.get_available_devices()
if not devices:
print("No audio devices found.")
return None
for index, device in enumerate(devices):
print(f"Device {index}: {device}")
if "Camera" in device: # 自动选择包含 "Camera" 的设备
print(f"USB Camera found at index {index}.")
return index
print("USB Camera not found.")
return None
def find_speaker_device():
global speaker_device
if speaker_device:
# 如果设备已经打开,直接返回设备
return speaker_device
result = subprocess.run(['aplay', '-l'], stdout=subprocess.PIPE, text=True)
output = result.stdout
# 检查 USB 音频设备
if "USB Audio Device" in output:
card_index = None
for line in output.splitlines():
if "USB Audio Device" in line:
card_index = line.split()[1].replace(':', '')
break
if card_index is not None:
speaker_device = f"plughw:{card_index},0"
print(f"Detected USB speaker device: {speaker_device}")
return speaker_device
# 检查 audiocodec 设备
if "audiocodec" in output:
card_index = None
for line in output.splitlines():
if "audiocodec" in line:
card_index = line.split()[1].replace(':', '')
break
if card_index is not None:
speaker_device = f"plughw:{card_index},0"
print(f"Detected audiocodec speaker device: {speaker_device}")
return speaker_device
# 未找到合适的设备
print("No valid speaker device found.")
return None
def initialize_device():
"""初始化设备并保持状态"""
global speaker_device
speaker_device = find_speaker_device()
if speaker_device:
try:
# 尝试打开设备以保持其状态
subprocess.run(['aplay', '-D', speaker_device, '-f', 'cd', '/dev/zero'], timeout=2, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
print(f"Device {speaker_device} initialized.")
except subprocess.TimeoutExpired:
print("Device initialization timed out.")
except subprocess.CalledProcessError as e:
print(f"Error initializing device: {e}")
# 初始化设备
initialize_device()
# 动态检测环境音量并设置 mindb
def detect_environment_volume(duration=3, sample_rate=16000):
"""
检测当前环境音量,并根据环境音量调整 mindb 值。
:param duration: 检测持续时间(秒)
:param sample_rate: 采样率
:return: 动态计算得到的最小声音值
"""
recorder = PvRecorder(frame_length=512, device_index=usb_mic_index)
recorder.start()
frames = []
print("开始检测环境音量,请保持安静...")
for _ in range(int(duration * sample_rate / 512)):
frames.append(recorder.read())
recorder.stop()
audio_data = np.concatenate(frames)
rms = np.sqrt(np.mean(np.square(audio_data)))
min_volume = rms * 10 # 简单转换为适当的 dB 值(这里需要调整具体的转换逻辑)
print(f"环境音量检测完成,最小声音阈值为: {min_volume}")
return min_volume
mindb = detect_environment_volume()
try:
recorder = PvRecorder(frame_length=512, device_index=usb_mic_index)
recorder.start()
print("录音器已启动")
recorder.stop()
print("录音器已停止")
except Exception as e:
print(f"录音器启动失败: {e}")
sys.exit(1)
# 标志位
record_type = 0 # 0 未录音 1 按住录音 2 语音唤醒
flag = 0 # 是否开始录音 0未开始 1开始 2结束
button_timestamp = 0
mqtt_list = []
play_url_list = []
playing_audio = False # 是否正在播放音频
audio_play_event = threading.Event()
current_msgId = 0
new_msgId = 0
current_play_process = None
audio_queue = queue.Queue()
def stop_current_audio():
global current_play_process, audio_play_event
if current_play_process:
audio_play_event.set() # 设置事件来通知停止播放
current_play_process.terminate()
current_play_process.wait()
print("已终止当前音频播放")
audio_play_event.clear() # 重置事件
def download_audio_file(url, local_filename):
"""下载音频文件到本地"""
try:
response = requests.get(url, stream=True)
if response.status_code == 200:
with open(local_filename, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
print(f"下载完成: {local_filename}")
return local_filename
else:
print(f"下载音频失败,状态码: {response.status_code}")
return None
except Exception as e:
print(f"下载音频时发生错误: {e}")
return None
def play_audio_file(file_path, msgId=None):
global current_play_process, audio_play_event
audio_play_event.clear()
if msgId is not None:
try:
msgId = int(msgId)
except ValueError:
print(f"无效的 msgId 值: {msgId}")
return
print(f"播放音频: {file_path} (msgId: {msgId})")
try:
current_play_process = subprocess.Popen(['aplay', '-D', speaker_device, '-q', file_path], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
while current_play_process.poll() is None:
if audio_play_event.is_set():
current_play_process.terminate()
current_play_process.wait()
print("已终止当前音频播放")
return
time.sleep(0.1)
print(f"播放完成: {file_path} (msgId: {msgId})")
except subprocess.CalledProcessError as e:
print(f"播放音频时发生错误: {e}")
finally:
current_play_process = None
def speech_queue_handler():
"""处理音频播放队列,确保最新的消息音频优先播放"""
global play_url_list, current_msgId, new_msgId
while True:
if play_url_list:
audio_data = play_url_list.pop(0)
audio_url, msgId = audio_data['url'], audio_data['msgId']
local_file = download_audio_file(audio_url, 'downloaded_audio.wav')
if local_file:
if msgId >= current_msgId: # 检查新消息的 msgId
play_audio_file(local_file, msgId)
if os.path.exists(local_file):
os.remove(local_file)
else:
print(f"下载音频文件失败,跳过播放。")
else:
time.sleep(0.01)
handler_thread = threading.Thread(target=speech_queue_handler)
handler_thread.start()
mqtt_client = None # 全局 MQTT 客户端实例
def handle_mqtt_message(message):
"""自定义处理 MQTT 消息的函数"""
global msgId
cmd_type = message.get("cmd")
if cmd_type == "text_to_speech":
if int(message.get("msgId", -1)) == msgId:
audio_urls = message.get("data", [])
for url in audio_urls:
local_file = download_audio_file(url, 'downloaded_audio.wav')
if local_file:
play_audio_file(local_file, msgId=message.get("msgId"))
elif cmd_type == "multi_chat_config":
wav_url = message.get("data", {}).get("wav_url")
if wav_url:
local_file = download_audio_file(wav_url, 'downloaded_audio.wav')
if local_file:
play_audio_file(local_file, msgId=message.get("msgId"))
def on_connect(client, userdata, flags, rc):
"""连接回调函数"""
print("Connected with result code " + str(rc))
client.subscribe(TOPIC)
def on_message(client, userdata, msg):
"""消息处理回调函数"""
print(f"收到消息: {msg.topic} {str(msg.payload)}")
try:
message = json.loads(msg.payload.decode('utf-8'))
# 调用用户自定义的消息处理回调
if userdata:
userdata(message)
except Exception as e:
print(f"解析消息时发生错误: {e}")
def start_mqtt(on_message_callback):
"""启动 MQTT 客户端"""
global mqtt_client
if mqtt_client is None: # 确保只创建一个 MQTT 客户端实例
mqtt_client = mqtt.Client()
mqtt_client.on_connect = on_connect
mqtt_client.on_message = on_message
mqtt_client.user_data_set(on_message_callback) # 设置用户数据
mqtt_client.connect(MQTT_SERVER, MQTT_PORT, 60)
mqtt_client.loop_start()
def mqtt_queue_handler():
"""启动 MQTT 队列处理程序"""
start_mqtt(handle_mqtt_message)
mqtt_queue_handler()
def record_handler():
global button_timestamp, record_type, image_id, take_photo, msgId
while True:
last_press_time = time.time() - button_timestamp
if last_press_time > 0.5:
continue
print("开始录音")
take_photo = True
print("调起拍照程序")
recorder.start()
frames = []
flag = 0
while True:
last_press_time = time.time() - button_timestamp
frame = recorder.read()
frames.append(frame)
if record_type == 1: # 按住录音模式
if last_press_time > debounce_time:
break
elif record_type == 2: # 语音唤醒模式
temp = np.max(frame)
print("当前音量:", temp)
if flag == 0:
if temp > mindb:
flag = 1
button_timestamp = time.time()
if last_press_time > waitTime:
print("等待超时,自动终止")
flag = 2
break
else:
if temp > mindb:
button_timestamp = time.time()
if last_press_time > delayTime:
break
button_timestamp = 0
if flag == 2:
print("等待超时,自动终止,不发送文件")
recorder.stop()
record_type = 0
continue
print("* done recording")
print(msgId)
recorder.stop()
wf = wave.open(WAVE_OUTPUT_FILENAME, 'wb')
wf.setnchannels(RECORD_CHANNELS)
wf.setsampwidth(2)
wf.setframerate(RECORD_RATE)
for frame in frames:
wf.writeframes(np.array(frame, dtype=np.int16).tobytes())
wf.close()
record_type = 0
msgId += 1
responsePost = requests.post(
'http://openpin3.z33.fun/chatbyvoiceAsync22/',
files={'audio_file': open(WAVE_OUTPUT_FILENAME, 'rb')},
data={
'devId': DEV_ID,
'nfcId': '0',
'msgId': str(msgId),
'respond_format': 'wav',
'image_id': image_id
}
)
print(responsePost.json())
play_audio_file(file_path='di.wav', msgId=None)
def button_handler():
global button_timestamp, record_type
while True:
result = subprocess.run(['gpio', 'read', '27'], capture_output=True, text=True)
if int(result.stdout.strip()) == 0: # 按钮按下时为低电平
if record_type != 1:
print('刚按下')
# 停止当前音频播放
stop_current_audio()
button_timestamp = time.time()
record_type = 1
time.sleep(0.1)
def update_registration(chinese_name, english_name):
csv_file = '/home/orangepi/program/qiandao/stuinfo.csv'
updated = False
try:
with open(csv_file, 'r', encoding='utf-8', errors='ignore') as file:
reader = list(csv.reader(file))
header = reader[0]
rows = reader[1:]
except Exception as e:
print(f"读取文件时出错: {e}")
return
for row in rows:
if row[1] == chinese_name or row[2] == english_name: # 匹配 ChineseName 或 EnglishName
row[-1] = '已注册'
updated = True
if updated:
try:
with open(csv_file, 'w', newline='', encoding='utf-8') as file:
writer = csv.writer(file)
writer.writerow(header)
writer.writerows(rows)
print(f"已更新 {chinese_name} / {english_name} 的注册状态为已注册。")
except Exception as e:
print(f"写入文件时出错: {e}")
else:
print(f"{chinese_name} / {english_name} 没有找到匹配的记录,未更新。")
def get_file_content_as_base64(path, urlencoded=False):
def task():
with open(path, "rb") as f:
content = base64.b64encode(f.read()).decode("utf8")
if urlencoded:
content = urllib.parse.quote_plus(content)
return content
return task
def upload_image_to_baidu(image_base64):
def task():
url = "https://aip.baidubce.com/rest/2.0/face/v3/multi-search?access_token=24.b6c23d3dcf5623492dfb639a408d9538.2592000.1727015570.282335-106648411"
payload = json.dumps({
"group_id_list": "group4",
"image": image_base64,
"image_type": "BASE64",
"max_face_num": 1,
"match_threshold": 80,
"quality_control": "NORMAL"
})
headers = {
'Content-Type': 'application/json'
}
response = requests.post(url, headers=headers, data=payload)
try:
result = response.json()
print("API Response:", result)
except json.JSONDecodeError:
print("无法解析 JSON 响应。")
print("响应文本:", response.text)
return
if result.get("error_code") == 0 and "face_list" in result["result"]:
for face in result["result"]["face_list"]:
if "user_list" in face and len(face["user_list"]) > 0:
for user in face["user_list"]:
user_info = json.loads(user["user_info"])
chinese_name = user_info.get("ChineseName", "未知")
english_name = user_info.get("EnglishName", "未知")
print(f"用户 ID: {user['user_id']}, 中文名: {chinese_name}, 英文名: {english_name}")
# 启动更新注册状态的任务
update_registration_task_thread = threading.Thread(target=update_registration, args=(chinese_name, english_name))
update_registration_task_thread.start()
else:
print(f"错误: {result.get('error_msg')}")
else:
print(f"错误: {result.get('error_msg')}")
return task
def capture_image_and_upload():
global image_id, take_photo
while True:
if not take_photo:
time.sleep(0.1)
continue
else:
cap = cv2.VideoCapture(camera_index)
ret, frame = cap.read()
if ret:
image_path = 'capture.jpg'
cv2.imwrite(image_path, frame)
cap.release()
responsePost = requests.post(
'http://openpin3.z33.fun/upload_image22',
files={'image': open(image_path, 'rb')},
data={'devId': DEV_ID}
).json()
image_id = responsePost.get('image_id')
print(responsePost)
# 获取图片的base64编码并上传到百度飞桨API
with ThreadPoolExecutor() as executor:
base64_future = executor.submit(get_file_content_as_base64(image_path))
image_base64 = base64_future.result()
upload_future = executor.submit(upload_image_to_baidu(image_base64))
upload_future.result()
take_photo = False
def wake_word_handler():
global record_type, button_timestamp, msgId, take_photo
porcupine_chinese = pvporcupine.create(
access_key='Slo6YfShQZ5PRQYx5xlUuPAd+M/rGa1UBtHqhL74s9C9GC+Jbu8kww==', # 替换为你的 Access Key
keyword_paths=[
'/home/orangepi/program/xiaosheng.ppn'
],
model_path='/home/orangepi/program/porcupine_params_zh.pv',
sensitivities=[0.5]
)
# porcupine_english = pvporcupine.create(
# access_key='srMJbguZzBwoUtxKuqmGmymN1Uz4YYwBGPSkv7yPoTkPTAA6nL0lhw==', # 替换为你的 Access Key
# keyword_paths=[
# '/home/orangepi/program/HelloNeo.ppn'
# ],
# sensitivities=[0.4]
# )
while True:
if record_type != 0:
time.sleep(0.1)
continue
recorder.start()
try:
print("Listening for wake word...")
while True:
if record_type != 0:
break
audio_frame = recorder.read()
keyword_index_chinese = porcupine_chinese.process(audio_frame)
if keyword_index_chinese >= 0:
print(f"Detected Chinese wake word: {keyword_index_chinese}")
recorder.stop()
stop_current_audio()
msgId += 1
play_audio_file(file_path='/home/orangepi/program/xiaosheng.wav', msgId=msgId)
break
# keyword_index_english = porcupine_english.process(audio_frame)
# if keyword_index_english >= 0:
# print(f"Detected English wake word: {keyword_index_english}")
# recorder.stop()
# play_audio_file(file_path='/home/orangepi/program/nio-wozaine.wav')
# break
except IOError as e:
print(f"Error reading from recorder: {e}")
finally:
button_timestamp = time.time()
record_type = 2
capture_task = threading.Thread(target=capture_image_and_upload, name="拍照程序")
mqtt_queue_handler_task = threading.Thread(target=mqtt_queue_handler, name="mqtt消息处理程序")
speech_queue_handler_task = threading.Thread(target=speech_queue_handler, name="播报程序")
button_handler_task = threading.Thread(target=button_handler, name="按键处理程序")
record_handler_task = threading.Thread(target=record_handler, name="录音程序")
wake_word_handler_task = threading.Thread(target=wake_word_handler, name="唤醒词监听程序")
capture_task.start()
mqtt_queue_handler_task.start()
speech_queue_handler_task.start()
button_handler_task.start()
record_handler_task.start()
wake_word_handler_task.start()
# 等待所有线程完成
capture_task.join()
mqtt_queue_handler_task.join()
speech_queue_handler_task.join()
button_handler_task.join()
record_handler_task.join()
wake_word_handler_task.join()