MediaPipe 介绍和使用
MediaPipe 是由 Google 开发的一个跨平台框架,用于构建多模态(如视频、音频等)应用的机器学习管道。它提供了现成的解决方案(如人脸检测、手部追踪、姿势估计等)和自定义模型构建工具。
目录
- 安装与环境配置
- 基础架构与概念
- 手部追踪
- 面部检测与网格
- 姿势估计
- 物体检测与追踪
- 自定义机器学习模型
- 交互式应用开发
- 性能优化
- 实战项目
安装与环境配置
基础安装
pip install mediapipe
可选依赖
# 如果需要摄像头支持
pip install opencv-python
# 如果需要3D可视化
pip install matplotlib pyopengl
验证安装
import mediapipe as mp
print(mp.__version__)
基础架构与概念
MediaPipe 核心概念
- 计算图 (Calculator Graph): MediaPipe 应用的基础构建块
- 数据包 (Packet): 在图中流动的数据单元
- 流 (Stream): 一系列按时间排序的数据包
- 解决方案 (Solution): 预构建的端到端管道
基本工作流程
import cv2
import mediapipe as mp
# 初始化解决方案
mp_drawing = mp.solutions.drawing_utils
mp_hands = mp.solutions.hands
# 创建处理实例
with mp_hands.Hands(
static_image_mode=False,
max_num_hands=2,
min_detection_confidence=0.5) as hands:
# 处理帧
image = cv2.imread("image.jpg")
results = hands.process(cv2.cvtColor(image, cv2.COLOR_BGR2RGB))
# 绘制结果
if results.multi_hand_landmarks:
for hand_landmarks in results.multi_hand_landmarks:
mp_drawing.draw_landmarks(
image, hand_landmarks, mp_hands.HAND_CONNECTIONS)
cv2.imshow("MediaPipe Hands", image)
cv2.waitKey(0)
手部追踪
基础手部检测
import cv2
import mediapipe as mp
mp_drawing = mp.solutions.drawing_utils
mp_hands = mp.solutions.hands
cap = cv2.VideoCapture(0)
with mp_hands.Hands(
min_detection_confidence=0.5,
min_tracking_confidence=0.5) as hands:
while cap.isOpened():
success, image = cap.read()
if not success:
continue
# 转换颜色空间并处理
image = cv2.cvtColor(cv2.flip(image, 1), cv2.COLOR_BGR2RGB)
results = hands.process(image)
# 绘制结果
image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
if results.multi_hand_landmarks:
for hand_landmarks in results.multi_hand_landmarks:
mp_drawing.draw_landmarks(
image, hand_landmarks, mp_hands.HAND_CONNECTIONS)
cv2.imshow('Hand Tracking', image)
if cv2.waitKey(5) & 0xFF == 27:
break
cap.release()
手势识别
def count_fingers(hand_landmarks):
tip_ids = [4, 8, 12, 16, 20] # 指尖关键点ID
fingers = []
# 大拇指
if hand_landmarks.landmark[tip_ids[0]].x < hand_landmarks.landmark[tip_ids[0]-1].x:
fingers.append(1)
else:
fingers.append(0)
# 其他手指
for id in range(1,5):
if hand_landmarks.landmark[tip_ids[id]].y < hand_landmarks.landmark[tip_ids[id]-2].y:
fingers.append(1)
else:
fingers.append(0)
return fingers
# 在主循环中使用
if results.multi_hand_landmarks:
for hand_landmarks in results.multi_hand_landmarks:
fingers = count_fingers(hand_landmarks)
total_fingers = sum(fingers)
cv2.putText(image, f'Fingers: {total_fingers}', (10,70),
cv2.FONT_HERSHEY_PLAIN, 3, (255,0,0), 3)
面部检测与网格
面部检测
mp_face_detection = mp.solutions.face_detection
with mp_face_detection.FaceDetection(
model_selection=0, # 0=短距离, 1=长距离
min_detection_confidence=0.5) as face_detection:
while cap.isOpened():
success, image = cap.read()
if not success:
continue
image = cv2.cvtColor(cv2.flip(image, 1), cv2.COLOR_BGR2RGB)
results = face_detection.process(image)
image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
if results.detections:
for detection in results.detections:
mp_drawing.draw_detection(image, detection)
cv2.imshow('Face Detection', image)
if cv2.waitKey(5) & 0xFF == 27:
break
面部网格 (468个关键点)
mp_face_mesh = mp.solutions.face_mesh
with mp_face_mesh.FaceMesh(
max_num_faces=1,
refine_landmarks=True, # 包含虹膜关键点
min_detection_confidence=0.5,
min_tracking_confidence=0.5) as face_mesh:
while cap.isOpened():
success, image = cap.read()
if not success:
continue
image = cv2.cvtColor(cv2.flip(image, 1), cv2.COLOR_BGR2RGB)
results = face_mesh.process(image)
image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
if results.multi_face_landmarks:
for face_landmarks in results.multi_face_landmarks:
mp_drawing.draw_landmarks(
image=image,
landmark_list=face_landmarks,
connections=mp_face_mesh.FACEMESH_TESSELATION,
landmark_drawing_spec=None,
connection_drawing_spec=mp_drawing.DrawingSpec(
color=(0,255,0), thickness=1, circle_radius=1))
cv2.imshow('Face Mesh', image)
if cv2.waitKey(5) & 0xFF == 27:
break
视线追踪
def draw_eye_contours(image, landmarks, indices):
points = []
for index in indices:
point = landmarks[index]
x = int(point.x * image.shape[1])
y = int(point.y * image.shape[0])
points.append((x,y))
cv2.polylines(image, [np.array(points)], True, (0,255,0), 1)
# 虹膜关键点索引
LEFT_IRIS = [474, 475, 476, 477]
RIGHT_IRIS = [469, 470, 471, 472]
# 在主循环中使用
if results.multi_face_landmarks:
for face_landmarks in results.multi_face_landmarks:
draw_eye_contours(image, face_landmarks.landmark, LEFT_IRIS)
draw_eye_contours(image, face_landmarks.landmark, RIGHT_IRIS)
姿势估计
全身姿势检测 (33个关键点)
mp_pose = mp.solutions.pose
with mp_pose.Pose(
static_image_mode=False,
model_complexity=1, # 0=轻量, 1=标准, 2=高精度
min_detection_confidence=0.5,
min_tracking_confidence=0.5) as pose:
while cap.isOpened():
success, image = cap.read()
if not success:
continue
image = cv2.cvtColor(cv2.flip(image, 1), cv2.COLOR_BGR2RGB)
results = pose.process(image)
image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
if results.pose_landmarks:
mp_drawing.draw_landmarks(
image, results.pose_landmarks, mp_pose.POSE_CONNECTIONS)
cv2.imshow('Pose Estimation', image)
if cv2.waitKey(5) & 0xFF == 27:
break
姿势分类
def calculate_angle(a, b, c):
"""计算三个点之间的角度"""
a = np.array(a) # 第一个点
b = np.array(b) # 中间点
c = np.array(c) # 终点
radians = np.arctan2(c[1]-b[1], c[0]-b[0]) - np.arctan2(a[1]-b[1], a[0]-b[0])
angle = np.abs(radians*180.0/np.pi)
if angle > 180.0:
angle = 360-angle
return angle
# 检测深蹲动作
if results.pose_landmarks:
landmarks = results.pose_landmarks.landmark
# 获取关键点坐标
left_hip = [landmarks[mp_pose.PoseLandmark.LEFT_HIP.value].x,
landmarks[mp_pose.PoseLandmark.LEFT_HIP.value].y]
left_knee = [landmarks[mp_pose.PoseLandmark.LEFT_KNEE.value].x,
landmarks[mp_pose.PoseLandmark.LEFT_KNEE.value].y]
left_ankle = [landmarks[mp_pose.PoseLandmark.LEFT_ANKLE.value].x,
landmarks[mp_pose.PoseLandmark.LEFT_ANKLE.value].y]
# 计算膝盖角度
angle = calculate_angle(left_hip, left_knee, left_ankle)
# 可视化
cv2.putText(image, f'Angle: {int(angle)}', (10,70),
cv2.FONT_HERSHEY_PLAIN, 3, (255,0,0), 3)
# 深蹲检测
if angle < 90:
cv2.putText(image, "SQUAT DOWN", (10,140),
cv2.FONT_HERSHEY_PLAIN, 3, (0,255,0), 3)
物体检测与追踪
物体检测
mp_objectron = mp.solutions.objectron
with mp_objectron.Objectron(
static_image_mode=False,
max_num_objects=5,
min_detection_confidence=0.5,
min_tracking_confidence=0.8,
model_name='Cup') as objectron: # 可选: Shoe, Chair, Camera
while cap.isOpened():
success, image = cap.read()
if not success:
continue
image = cv2.cvtColor(cv2.flip(image, 1), cv2.COLOR_BGR2RGB)
results = objectron.process(image)
image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
if results.detected_objects:
for detected_object in results.detected_objects:
mp_drawing.draw_landmarks(
image, detected_object.landmarks_2d, mp_objectron.BOX_CONNECTIONS)
mp_drawing.draw_axis(image, detected_object.rotation, detected_object.translation)
cv2.imshow('Object Detection', image)
if cv2.waitKey(5) & 0xFF == 27:
break
手掌检测 (用于手势识别)
mp_hands = mp.solutions.hands
mp_drawing = mp.solutions.drawing_utils
with mp_hands.Hands(
static_image_mode=False,
max_num_hands=2,
min_detection_confidence=0.7) as hands:
while cap.isOpened():
success, image = cap.read()
if not success:
continue
image = cv2.cvtColor(cv2.flip(image, 1), cv2.COLOR_BGR2RGB)
results = hands.process(image)
image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
if results.multi_hand_landmarks:
for hand_landmarks in results.multi_hand_landmarks:
# 获取手掌边界框
h, w, _ = image.shape
x_min = w
y_min = h
x_max = 0
y_max = 0
for lm in hand_landmarks.landmark:
x, y = int(lm.x * w), int(lm.y * h)
if x < x_min:
x_min = x
if x > x_max:
x_max = x
if y < y_min:
y_min = y
if y > y_max:
y_max = y
# 绘制边界框
cv2.rectangle(image, (x_min-20, y_min-20), (x_max+20, y_max+20), (0,255,0), 2)
# 绘制手部关键点
mp_drawing.draw_landmarks(image, hand_landmarks, mp_hands.HAND_CONNECTIONS)
cv2.imshow('Hand Detection', image)
if cv2.waitKey(5) & 0xFF == 27:
break
自定义机器学习模型
集成TensorFlow模型
import tensorflow as tf
import mediapipe as mp
# 加载自定义TensorFlow模型
model = tf.keras.models.load_model('custom_model.h5')
# 创建自定义计算器
class CustomModelCalculator(mp.Calculator):
def __init__(self):
super().__init__()
self.model = model
def Process(self, input_packets):
image_packet = input_packets['image']
image = image_packet.get()
# 预处理图像
processed_image = preprocess(image)
# 进行预测
predictions = self.model.predict(np.expand_dims(processed_image, axis=0))
# 创建输出包
output_packets = {'predictions': mp.Packet(predictions)}
return output_packets
# 注册自定义计算器
mp.register_calculator('CustomModelCalculator', CustomModelCalculator)
创建自定义管道
graph_config = """
input_stream: "input_video"
output_stream: "output_video"
node {
calculator: "FlowLimiterCalculator"
input_stream: "input_video"
input_stream: "FINISHED:output_video"
input_stream_info: {
tag_index: "FINISHED"
back_edge: true
}
output_stream: "throttled_input_video"
}
node {
calculator: "CustomModelCalculator"
input_stream: "IMAGE:throttled_input_video"
output_stream: "PREDICTIONS:predictions"
}
node {
calculator: "RenderAnnotationCalculator"
input_stream: "IMAGE:throttled_input_video"
input_stream: "predictions"
output_stream: "IMAGE:output_video"
}
"""
# 创建并运行管道
graph = mp.CalculatorGraph(graph_config=graph_config)
graph.start_run()
# 输入视频帧
for frame in video_frames:
graph.add_packet('input_video', mp.packet_creator.create_image_frame(frame))
graph.close()
交互式应用开发
虚拟绘画应用
import numpy as np
import cv2
import mediapipe as mp
mp_drawing = mp.solutions.drawing_utils
mp_hands = mp.solutions.hands
# 初始化画布
canvas = np.zeros((480, 640, 3), dtype=np.uint8)
prev_point = None
with mp_hands.Hands(
min_detection_confidence=0.7,
min_tracking_confidence=0.7) as hands:
cap = cv2.VideoCapture(0)
while cap.isOpened():
success, image = cap.read()
if not success:
continue
image = cv2.cvtColor(cv2.flip(image, 1), cv2.COLOR_BGR2RGB)
results = hands.process(image)
image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
if results.multi_hand_landmarks:
for hand_landmarks in results.multi_hand_landmarks:
# 获取食指指尖坐标
x = int(hand_landmarks.landmark[8].x * image.shape[1])
y = int(hand_landmarks.landmark[8].y * image.shape[0])
# 检查是否处于绘画模式(大拇指是否弯曲)
thumb_tip = hand_landmarks.landmark[4]
thumb_mcp = hand_landmarks.landmark[2]
if thumb_tip.x < thumb_mcp.x: # 大拇指弯曲
if prev_point:
cv2.line(canvas, prev_point, (x,y), (255,255,255), 5)
prev_point = (x,y)
else:
prev_point = None
# 绘制手部关键点
mp_drawing.draw_landmarks(image, hand_landmarks, mp_hands.HAND_CONNECTIONS)
# 合并图像和画布
image = cv2.addWeighted(image, 0.7, canvas, 0.3, 0)
cv2.imshow('Virtual Drawing', image)
key = cv2.waitKey(5)
if key == 27: # ESC
break
elif key == ord('c'): # 清空画布
canvas.fill(0)
cap.release()
增强现实滤镜
def apply_sunglasses_filter(image, face_landmarks):
# 获取眼睛区域坐标
left_eye = [
(int(face_landmarks.landmark[33].x * image.shape[1]),
int(face_landmarks.landmark[33].y * image.shape[0])),
(int(face_landmarks.landmark[133].x * image.shape[1]),
int(face_landmarks.landmark[133].y * image.shape[0]))
]
right_eye = [
(int(face_landmarks.landmark[362].x * image.shape[1]),
int(face_landmarks.landmark[362].y * image.shape[0])),
(int(face_landmarks.landmark[263].x * image.shape[1]),
int(face_landmarks.landmark[263].y * image.shape[0]))
]
# 计算眼镜位置和大小
center_left = np.mean(left_eye, axis=0).astype(int)
center_right = np.mean(right_eye, axis=0).astype(int)
width = int(np.linalg.norm(center_right - center_left) * 2)
height = int(width * 0.3)
# 创建眼镜图像
sunglasses = np.zeros((height, width, 4), dtype=np.uint8)
cv2.rectangle(sunglasses, (0,0), (width,height), (0,0,0,255), -1)
# 计算旋转角度
angle = np.degrees(np.arctan2(
center_right[1] - center_left[1],
center_right[0] - center_left[0]))
# 旋转眼镜
M = cv2.getRotationMatrix2D((width//2, height//2), angle, 1)
rotated = cv2.warpAffine(sunglasses, M, (width, height))
# 放置眼镜
x = center_left[0] - width//3
y = center_left[1] - height//2
# 应用alpha混合
overlay = image.copy()
alpha_s = rotated[:,:,3] / 255.0
alpha_l = 1.0 - alpha_s
for c in range(0,3):
overlay[y:y+height, x:x+width, c] = (
alpha_s * rotated[:,:,c] +
alpha_l * overlay[y:y+height, x:x+width, c])
return overlay
# 在主循环中使用
if results.multi_face_landmarks:
for face_landmarks in results.multi_face_landmarks:
image = apply_sunglasses_filter(image, face_landmarks)
性能优化
多线程处理
import threading
class ProcessingThread(threading.Thread):
def __init__(self, frame_queue, result_queue):
threading.Thread.__init__(self)
self.frame_queue = frame_queue
self.result_queue = result_queue
self.detector = mp_hands.Hands(
static_image_mode=False,
max_num_hands=2,
min_detection_confidence=0.5)
def run(self):
while True:
frame = self.frame_queue.get()
if frame is None:
break
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
results = self.detector.process(frame_rgb)
self.result_queue.put((frame, results))
# 主线程
frame_queue = queue.Queue(maxsize=1)
result_queue = queue.Queue(maxsize=1)
worker = ProcessingThread(frame_queue, result_queue)
worker.start()
while cap.isOpened():
ret, frame = cap.read()
if not ret:
continue
if frame_queue.empty():
frame_queue.put(frame)
if not result_queue.empty():
processed_frame, results = result_queue.get()
# 处理结果...
分辨率与模型复杂度调整
# 降低分辨率
cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640)
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480)
# 使用轻量级模型
with mp_pose.Pose(
model_complexity=0, # 0=轻量, 1=标准, 2=高精度
min_detection_confidence=0.5) as pose:
# 处理代码...
帧跳过策略
frame_counter = 0
skip_frames = 2 # 每3帧处理1帧
while cap.isOpened():
ret, frame = cap.read()
if not ret:
continue
frame_counter += 1
if frame_counter % (skip_frames + 1) != 0:
continue
# 处理帧...
实战项目
1. 手势控制媒体播放器
import cv2
import mediapipe as mp
import pyautogui
mp_hands = mp.solutions.hands
hands = mp_hands.Hands(max_num_hands=1)
cap = cv2.VideoCapture(0)
screen_w, screen_h = pyautogui.size()
while cap.isOpened():
ret, frame = cap.read()
if not ret:
continue
frame = cv2.flip(frame, 1)
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
results = hands.process(frame_rgb)
if results.multi_hand_landmarks:
for hand_landmarks in results.multi_hand_landmarks:
# 获取食指位置
x = int(hand_landmarks.landmark[8].x * frame.shape[1])
y = int(hand_landmarks.landmark[8].y * frame.shape[0])
# 移动鼠标
screen_x = np.interp(x, [0, frame.shape[1]], [0, screen_w])
screen_y = np.interp(y, [0, frame.shape[0]], [0, screen_h])
pyautogui.moveTo(screen_x, screen_y)
# 检测点击手势(大拇指和食指接触)
thumb_tip = hand_landmarks.landmark[4]
index_tip = hand_landmarks.landmark[8]
if abs(thumb_tip.x - index_tip.x) < 0.05 and abs(thumb_tip.y - index_tip.y) < 0.05:
pyautogui.click()
cv2.circle(frame, (x,y), 10, (0,255,0), -1)
cv2.imshow('Gesture Control', frame)
if cv2.waitKey(10) & 0xFF == 27:
break
cap.release()
2. 健身动作计数器
import cv2
import mediapipe as mp
import numpy as np
mp_pose = mp.solutions.pose
pose = mp_pose.Pose()
cap = cv2.VideoCapture(0)
counter = 0
stage = None # "up" or "down"
while cap.isOpened():
ret, frame = cap.read()
if not ret:
continue
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
results = pose.process(frame_rgb)
if results.pose_landmarks:
landmarks = results.pose_landmarks.landmark
# 获取肩膀、肘部和手腕坐标
shoulder = [landmarks[mp_pose.PoseLandmark.LEFT_SHOULDER.value].x,
landmarks[mp_pose.PoseLandmark.LEFT_SHOULDER.value].y]
elbow = [landmarks[mp_pose.PoseLandmark.LEFT_ELBOW.value].x,
landmarks[mp_pose.PoseLandmark.LEFT_ELBOW.value].y]
wrist = [landmarks[mp_pose.PoseLandmark.LEFT_WRIST.value].x,
landmarks[mp_pose.PoseLandmark.LEFT_WRIST.value].y]
# 计算角度
angle = np.degrees(np.arctan2(wrist[1]-elbow[1], wrist[0]-elbow[0]) -
np.arctan2(shoulder[1]-elbow[1], shoulder[0]-elbow[0]))
angle = np.abs(angle)
if angle > 180:
angle = 360 - angle
# 计数逻辑
if angle > 160:
stage = "down"
if angle < 30 and stage == "down":
stage = "up"
counter += 1
# 显示计数
cv2.putText(frame, f'Reps: {counter}', (10,50),
cv2.FONT_HERSHEY_SIMPLEX, 1, (255,0,0), 2)
cv2.putText(frame, f'Angle: {int(angle)}', (10,100),
cv2.FONT_HERSHEY_SIMPLEX, 1, (255,0,0), 2)
# 绘制关键点
mp.solutions.drawing_utils.draw_landmarks(
frame, results.pose_landmarks, mp_pose.POSE_CONNECTIONS)
cv2.imshow('Fitness Tracker', frame)
if cv2.waitKey(10) & 0xFF == 27:
break
cap.release()
3. 实时情绪识别
import cv2
import mediapipe as mp
import numpy as np
from tensorflow.keras.models import load_model
# 加载情绪识别模型
emotion_model = load_model('emotion_model.h5')
emotion_labels = ['Angry', 'Disgust', 'Fear', 'Happy', 'Neutral', 'Sad', 'Surprise']
# 初始化面部网格检测
mp_face_mesh = mp.solutions.face_mesh
face_mesh = mp_face_mesh.FaceMesh(max_num_faces=1)
cap = cv2.VideoCapture(0)
while cap.isOpened():
ret, frame = cap.read()
if not ret:
continue
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
results = face_mesh.process(frame_rgb)
if results.multi_face_landmarks:
for face_landmarks in results.multi_face_landmarks:
# 提取面部ROI
h, w, _ = frame.shape
x_min = w
y_min = h
x_max = 0
y_max = 0
for lm in face_landmarks.landmark:
x, y = int(lm.x * w), int(lm.y * h)
if x < x_min:
x_min = x
if x > x_max:
x_max = x
if y < y_min:
y_min = y
if y > y_max:
y_max = y
# 扩展边界框
margin = 30
x_min = max(0, x_min - margin)
y_min = max(0, y_min - margin)
x_max = min(w, x_max + margin)
y_max = min(h, y_max + margin)
# 提取面部图像
face_roi = frame[y_min:y_max, x_min:x_max]
if face_roi.size != 0:
# 预处理面部图像
face_gray = cv2.cvtColor(face_roi, cv2.COLOR_BGR2GRAY)
face_resized = cv2.resize(face_gray, (48,48))
face_normalized = face_resized / 255.0
face_input = np.expand_dims(np.expand_dims(face_normalized, -1), 0)
# 预测情绪
predictions = emotion_model.predict(face_input)
emotion_index = np.argmax(predictions)
emotion = emotion_labels[emotion_index]
confidence = np.max(predictions)
# 显示结果
cv2.rectangle(frame, (x_min,y_min), (x_max,y_max), (0,255,0), 2)
cv2.putText(frame, f'{emotion} ({confidence:.2f})',
(x_min, y_min-10), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0,255,0), 2)
cv2.imshow('Emotion Recognition', frame)
if cv2.waitKey(10) & 0xFF == 27:
break
cap.release()
总结
MediaPipe 是一个功能强大且灵活的框架,本教程涵盖了:
- 基础安装与核心概念
- 手部、面部和姿势检测
- 物体检测与追踪
- 自定义模型集成
- 交互式应用开发
- 性能优化技巧
- 实战项目示例
要深入掌握 MediaPipe,建议:
- 查阅官方文档和示例代码
- 尝试修改和扩展提供的示例
- 探索更多预构建解决方案(如头发分割、3D物体检测等)
- 学习如何构建自定义计算图和管道
- 参与 MediaPipe 开源社区
MediaPipe 仍在快速发展,建议关注其 GitHub 仓库和官方博客以获取最新功能和改进。