AI

人工智能相关文章

机器学习实战指南:从零搭建企业级智能推荐系统

--- title: 机器学习实战指南:从零搭建企业级智能推荐系统 date: 2026-04-28 category: AI type_id: 1 guid: 13012ea3afd4c6548b2c79d013bc1633 keywords: [机器学习实战, 推荐系统, 协同过滤, 深度学习推荐, 特征工程, A/B测试, MLOps, 模型部署] summary: 本文以构建企业级智能推荐系统为实战案例,从数据处理、特征工程、模型选型、训练优化到线上部署,完整讲解机器学习项目的全生命周期。涵盖基于Spark的大规模数据处理、双塔召回模型、多目标排序模型、实时特征计算等技术方案,并提供可落地的代码示例和工程经验总结。 --- # 机器学习实战指南:从零搭建企业级智能推荐系统 ## 引言 推荐系统是机器学习技术在工业界最成功的应用之一。从电商平台的商品推荐到短视频平台的内容分发,推荐系统直接影响着用户留存和商业变现效率。本文将以一个完整的实战案例,讲解如何从零搭建一个企业级智能推荐系统,涵盖从数据工程到模型训练再到线上部署的完整链路。 ## 一、系统架构设计 ### 1.1 整体架构 企业级推荐系统通常采用分层架构,核心包括召回层、粗排层、精排层和重排层: ``` 用户请求 │ ▼ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ 召回层 │───→│ 粗排层 │───→│ 精排层 │───→│ 重排层 │ │ 候选集 │ │ 预排序 │ │ 精细排序 │ │ 业务规则 │ │ 万→千 │ │ 千→百 │ │ 百→十 │ │ 多样性 │ └──────────┘ └──────────┘ └──────────┘ └──────────┘ ``` - **召回层**:从百万级物料库中快速筛选出千级别候选集 - **粗排层**:使用轻量级模型对候选集初步排序 - **精排层**:使用复杂模型精确预估CTR/CVR等指标 - **重排层**:融入业务规则(多样性、新鲜度、流量扶持等) ### 1.2 技术栈选型 | 层级 | 技术方案 | 说明 | |------|---------|------| | 数据存储 | MySQL + Redis + HBase | 用户画像/特征/实时数据 | | 数据处理 | Apache Spark | 离线特征计算与训练样本构建 | | 模型训练 | PyTorch + DeepSpeed | 分布式训练框架 | | 在线推理 | Triton Inference Server | GPU推理服务 | | 特征平台 | Feast | 在线/离线特征统一管理 | | 调度系统 | Apache Airflow | 训练与部署流水线编排 | ## 二、数据处理与特征工程 ### 2.1 数据采集与清洗 推荐系统需要处理的核心数据包括用户行为日志、物料属性和用户画像。 ```python import pandas as pd import numpy as np from pyspark.sql import SparkSession from pyspark.sql.functions import col, collect_list, size, udf from pyspark.sql.types import ArrayType, FloatType spark = SparkSession.builder \ .appName("RecSysDataPipeline") \ .config("spark.sql.shuffle.partitions", "200") \ .getOrCreate() # 读取原始行为日志 behavior_df = spark.read.parquet("hdfs:///data/raw/behaviors/") # 数据清洗:去重、过滤异常值 cleaned_df = behavior_df \ .filter(col("duration") > 0) \ .filter(col("duration") < 7200) \ .dropDuplicates(["user_id", "item_id", "timestamp"]) \ .na.fill({"category": "unknown"}) # 行为权重编码 def behavior_weight(behavior_type: str) -> float: weights = { "click": 1.0, "collect": 3.0, "cart": 5.0, "purchase": 10.0, "share": 8.0, } return weights.get(behavior_type, 0.5) weight_udf = udf(behavior_weight, FloatType()) scored_df = cleaned_df.withColumn( "score", weight_udf(col("behavior_type")) ) # 构建用户-物品交互矩阵 user_item_matrix = scored_df.groupBy("user_id").agg( collect_list("item_id").alias("interacted_items"), collect_list("score").alias("interaction_scores"), size("item_id").alias("interaction_count"), ) ``` ### 2.2 特征工程设计 推荐系统的特征通常分为四类: ```python # 1. 用户特征 user_features = { "user_id": "用户唯一标识", "age_bucket": "年龄段 (18-24/25-34/35-44/45+)", "gender": "性别", "city_level": "城市等级", "purchase_power": "消费能力等级", "active_days_7d": "近7天活跃天数", "active_days_30d": "近30天活跃天数", "category_preference_top5": "偏好类目TOP5", "price_preference_avg": "平均消费价格", "click_count_7d": "近7天点击次数", "purchase_count_30d": "近30天购买次数", } # 2. 物料特征 item_features = { "item_id": "物品唯一标识", "category_l1/l2/l3": "一级/二级/三级类目", "brand_id": "品牌ID", "price": "价格", "avg_rating": "平均评分", "comment_count": "评论数", "sales_volume_7d": "近7天销量", "ctr_7d": "近7天点击率", "cvr_7d": "近7天转化率", "create_days": "上架天数", } # 3. 交叉特征 cross_features = { "user_category_ctr": "用户对该类目的历史点击率", "user_brand_affinity": "用户对该品牌的偏好度", "item_popularity_user_group": "同人群热度", "price_match_score": "价格匹配度", } # 4. 上下文特征 context_features = { "hour_of_day": "当前小时 (0-23)", "day_of_week": "星期几 (0-6)", "is_weekend": "是否周末", "device_type": "设备类型", "network_type": "网络类型", "location": "用户位置", } ``` ### 2.3 离线特征计算流水线 ```python from pyspark.sql import Window from pyspark.sql.functions import ( rank, count, avg, sum as spark_sum, datediff, current_timestamp, lit ) # 用户统计特征 user_stats = cleaned_df.groupBy("user_id").agg( count("*").alias("total_actions"), spark_sum("score").alias("total_score"), avg("score").alias("avg_score"), countDistinct("item_id").alias("distinct_items"), countDistinct("category").alias("distinct_categories"), ) # 用户偏好类目(按频次TOP5) category_window = Window.partitionBy("user_id") \ .orderBy(col("freq").desc()) user_category_pref = cleaned_df.groupBy("user_id", "category") \ .agg(count("*").alias("freq")) \ .withColumn("rank", rank().over(category_window)) \ .filter(col("rank") <= 5) \ .groupBy("user_id") \ .agg(collect_list("category").alias("top5_categories")) # 物料热度特征 item_stats = cleaned_df.groupBy("item_id").agg( countDistinct("user_id").alias("unique_users"), spark_sum(col("duration")).alias("total_duration"), avg("score").alias("avg_interaction_score"), ) # 时间衰减因子 time_decay = udf(lambda ts: np.exp(-0.1 * datediff( current_timestamp(), ts ).cast("int")), FloatType()) scored_with_decay = scored_df.withColumn( "time_decay_score", col("score") * time_decay(col("timestamp")) ) ``` ## 三、召回模型:双塔模型 ### 3.1 模型架构 双塔模型(Two-Tower Model)是当前工业界最主流的召回方案,其核心思想是将用户和物品分别编码到同一向量空间,通过向量相似度进行快速召回。 ```python import torch import torch.nn as nn import torch.nn.functional as F class UserTower(nn.Module): """用户塔:将用户特征编码为稠密向量""" def __init__( self, user_id_dim: int = 100000, emb_dim: int = 64, hidden_dims: list = [256, 128], output_dim: int = 128, ): super().__init__() self.user_emb = nn.Embedding(user_id_dim, emb_dim) self.age_emb = nn.Embedding(6, 16) self.gender_emb = nn.Embedding(3, 8) self.city_emb = nn.Embedding(20, 16) # 数值特征 self.active_bn = nn.BatchNorm1d(2) self.purchase_bn = nn.BatchNorm1d(2) # 特征拼接后维度 input_dim = emb_dim + 16 + 8 + 16 + 2 + 2 layers = [] prev_dim = input_dim for h_dim in hidden_dims: layers.extend([ nn.Linear(prev_dim, h_dim), nn.BatchNorm1d(h_dim), nn.ReLU(), nn.Dropout(0.2), ]) prev_dim = h_dim self.mlp = nn.Sequential(*layers) self.output_layer = nn.Linear(prev_dim, output_dim) def forward(self, user_id, age, gender, city, active_7d, active_30d): u_emb = self.user_emb(user_id) a_emb = self.age_emb(age) g_emb = self.gender_emb(gender) c_emb = self.city_emb(city) numeric = torch.stack([active_7d, active_30d], dim=1) numeric = self.active_bn(numeric) x = torch.cat([ u_emb, a_emb, g_emb, c_emb, numeric ], dim=1) x = self.mlp(x) return F.normalize(self.output_layer(x), dim=1) class ItemTower(nn.Module): """物品塔:将物品特征编码为稠密向量""" def __init__( self, item_id_dim: int = 500000, category_dim: int = 1000, brand_dim: int = 5000, emb_dim: int = 64, hidden_dims: list = [256, 128], output_dim: int = 128, ): super().__init__() self.item_emb = nn.Embedding(item_id_dim, emb_dim) self.cat_emb = nn.Embedding(category_dim, 32) self.brand_emb = nn.Embedding(brand_dim, 32) self.price_bn = nn.BatchNorm1d(1) self.stats_bn = nn.BatchNorm1d(3) input_dim = emb_dim + 32 + 32 + 1 + 3 layers = [] prev_dim = input_dim for h_dim in hidden_dims: layers.extend([ nn.Linear(prev_dim, h_dim), nn.BatchNorm1d(h_dim), nn.ReLU(), nn.Dropout(0.2), ]) prev_dim = h_dim self.mlp = nn.Sequential(*layers) self.output_layer = nn.Linear(prev_dim, output_dim) def forward(self, item_id, category, brand, price, ctr_7d, cvr_7d, sales): i_emb = self.item_emb(item_id) c_emb = self.cat_emb(category) b_emb = self.brand_emb(brand) price = self.price_bn(price.unsqueeze(1)) stats = torch.stack([ctr_7d, cvr_7d, sales], dim=1) stats = self.stats_bn(stats) x = torch.cat([i_emb, c_emb, b_emb, price, stats], dim=1) x = self.mlp(x) return F.normalize(self.output_layer(x), dim=1) class TwoTowerModel(nn.Module): """双塔召回模型""" def __init__(self, user_config: dict, item_config: dict): super().__init__() self.user_tower = UserTower(**user_config) self.item_tower = ItemTower(**item_config) self.temperature = nn.Parameter(torch.ones(1) * 0.07) def forward(self, user_features, item_features): user_vec = self.user_tower(**user_features) item_vec = self.item_tower(**item_features) # InfoNCE对比学习损失 logits = torch.matmul(user_vec, item_vec.T) / self.temperature return logits def get_user_embedding(self, user_features): with torch.no_grad(): return self.user_tower(**user_features) def get_item_embedding(self, item_features): with torch.no_grad(): return self.item_tower(**item_features) ``` ### 3.2 训练与评估 ```python import torch.optim as optim from torch.utils.data import DataLoader, Dataset class RecSysDataset(Dataset): def __init__(self, data_path: str): self.data = torch.load(data_path) def __len__(self): return len(self.data) def __getitem__(self, idx): return self.data[idx] def train_two_tower(model, train_loader, val_loader, epochs=10, lr=1e-3, device="cuda"): model = model.to(device) optimizer = optim.AdamW(model.parameters(), lr=lr, weight_decay=1e-5) scheduler = optim.lr_scheduler.CosineAnnealingLR( optimizer, T_max=epochs ) for epoch in range(epochs): model.train() total_loss = 0 for batch in train_loader: user_feat = { k: v.to(device) for k, v in batch["user"].items() } item_feat = { k: v.to(device) for k, v in batch["item"].items() } labels = batch["label"].to(device) logits = model(user_feat, item_feat) loss = F.cross_entropy(logits, labels) optimizer.zero_grad() loss.backward() torch.nn.utils.clip_grad_norm_( model.parameters(), max_norm=5.0 ) optimizer.step() total_loss += loss.item() scheduler.step() avg_loss = total_loss / len(train_loader) # 评估 recall, ndcg = evaluate_recall(model, val_loader, device) print( f"Epoch {epoch}: loss={avg_loss:.4f}, " f"recall@50={recall:.4f}, NDCG@50={ndcg:.4f}" ) def evaluate_recall(model, loader, device, k=50): """评估召回指标""" model.eval() recalls, ndcgs = [], [] with torch.no_grad(): for batch in loader: user_feat = { k: v.to(device) for k, v in batch["user"].items() } item_feat = { k: v.to(device) for k, v in batch["item"].items() } labels = batch["label"].to(device) user_vec = model.user_tower(**user_feat) item_vec = model.item_tower(**item_feat) scores = torch.matmul(user_vec, item_vec.T) _, topk_indices = torch.topk(scores, k, dim=1) for i in range(len(labels)): pos_indices = set( torch.where(labels[i] == 1)[0].tolist() ) pred_indices = set(topk_indices[i].tolist()) hit = len(pos_indices & pred_indices) recalls.append(hit / max(len(pos_indices), 1)) # NDCG计算 dcg = 0 for rank, idx in enumerate(topk_indices[i]): if idx.item() in pos_indices: dcg += 1 / np.log2(rank + 2) idcg = sum( 1 / np.log2(r + 2) for r in range(min(len(pos_indices), k)) ) ndcgs.append(dcg / max(idcg, 1e-10)) return np.mean(recalls), np.mean(ndcgs) ``` ## 四、排序模型:多任务学习 ### 4.1 多目标排序模型 ```python class MultiTaskRankModel(nn.Module): """多目标排序模型:同时预估CTR和CVR""" def __init__(self, feature_dim: int = 256): super().__init__() # 共享底层 self.shared_bottom = nn.Sequential( nn.Linear(feature_dim, 512), nn.BatchNorm1d(512), nn.ReLU(), nn.Dropout(0.3), nn.Linear(512, 256), nn.BatchNorm1d(256), nn.ReLU(), nn.Dropout(0.2), ) # CTR专家塔 self.ctr_tower = nn.Sequential( nn.Linear(256, 128), nn.ReLU(), nn.Dropout(0.1), nn.Linear(128, 1), nn.Sigmoid(), ) # CVR专家塔 self.cvr_tower = nn.Sequential( nn.Linear(256, 128), nn.ReLU(), nn.Dropout(0.1), nn.Linear(128, 1), nn.Sigmoid(), ) def forward(self, x): shared = self.shared_bottom(x) ctr_pred = self.ctr_tower(shared) cvr_pred = self.cvr_tower(shared) return ctr_pred, cvr_pred def multi_task_loss(ctr_pred, cvr_pred, ctr_label, cvr_label, alpha=0.7): """多任务损失:加权BCE""" ctr_loss = F.binary_cross_entropy( ctr_pred.squeeze(), ctr_label.float() ) cvr_loss = F.binary_cross_entropy( cvr_pred.squeeze(), cvr_label.float() ) return alpha * ctr_loss + (1 - alpha) * cvr_loss ``` ## 五、在线服务与部署 ### 5.1 基于Faiss的向量检索服务 ```python import faiss import numpy as np class RecallService: """基于Faiss的在线召回服务""" def __init__(self, dim: int = 128, index_type="IVF"): self.dim = dim if index_type == "IVF": # IVF-PQ:平衡精度和速度 quantizer = faiss.IndexFlatIP(dim) self.index = faiss.IndexIVFPQ( quantizer, dim, nlist=4096, # 聚类中心数 m=8, # PQ子向量数 nbits=8, # 每个子向量编码位数 ) elif index_type == "HNSW": self.index = faiss.IndexHNSWFlat(dim, M=32) def build(self, item_vectors: np.ndarray): """构建索引""" assert item_vectors.shape[1] == self.dim self.index.train(item_vectors) self.index.add(item_vectors) def search(self, query_vector: np.ndarray, top_k: int = 100): """搜索最相似的item""" query = query_vector.reshape(1, -1).astype("float32") distances, indices = self.index.search(query, top_k) return indices[0], distances[0] # 构建与保存索引 recall_service = RecallService(dim=128, index_type="IVF") item_vectors = np.random.randn(500000, 128).astype("float32") item_vectors = item_vectors / np.linalg.norm( item_vectors, axis=1, keepdims=True ) # L2归一化 recall_service.build(item_vectors) faiss.write_index(recall_service.index, "recall_index.faiss") ``` ### 5.2 FastAPI在线推理服务 ```python from fastapi import FastAPI, HTTPException from pydantic import BaseModel import httpx import asyncio app = FastAPI(title="Recommendation Service") class RecommendRequest(BaseModel): user_id: int context: dict = {} num_items: int = 20 class RecommendResponse(BaseModel): user_id: int items: list[dict] latency_ms: float # 全局加载模型和索引 model = TwoTowerModel(user_config={}, item_config={}) model.load_state_dict(torch.load("two_tower_best.pt")) model.eval() recall_index = faiss.read_index("recall_index.faiss") @app.post("/recommend", response_model=RecommendResponse) async def recommend(req: RecommendRequest): start = time.perf_counter() # 1. 获取用户特征 user_features = await fetch_user_features(req.user_id) # 2. 召回 user_vec = model.get_user_embedding(user_features).cpu().numpy() recall_ids, scores = recall_index.search(user_vec, top_k=500) # 3. 获取召回物料特征 item_features = await fetch_item_features(recall_ids.tolist()) # 4. 精排 rank_scores = rank_model.predict(item_features) # 5. 重排(多样性+业务规则) final_items = rerank(rank_scores, req.context) latency = (time.perf_counter() - start) * 1000 return RecommendResponse( user_id=req.user_id, items=final_items[:req.num_items], latency_ms=latency, ) ``` ## 六、A/B测试与效果评估 ### 6.1 分层实验框架 ```python class ABTestFramework: """推荐系统A/B测试框架""" def __init__(self, experiment_name: str): self.experiment_name = experiment_name self.group_metrics = {} def assign_group(self, user_id: int) -> str: """一致性哈希分桶""" bucket = int(hashlib.md5( f"{self.experiment_name}:{user_id}".encode() ).hexdigest(), 16) % 100 if bucket < 50: return "control" # 基线模型 elif bucket < 80: return "exp_v1" # 实验组1 else: return "exp_v2" # 实验组2 def record_metric(self, group: str, metric_name: str, value: float): """记录指标""" if group not in self.group_metrics: self.group_metrics[group] = {} if metric_name not in self.group_metrics[group]: self.group_metrics[group][metric_name] = [] self.group_metrics[group][metric_name].append(value) def report(self): """生成实验报告""" report_data = {} for group, metrics in self.group_metrics.items(): report_data[group] = {} for metric_name, values in metrics.items(): report_data[group][metric_name] = { "mean": np.mean(values), "std": np.std(values), "count": len(values), } return report_data ``` ## 七、总结 构建企业级推荐系统是一个复杂的系统工程,涉及数据处理、特征工程、模型训练和在线服务等多个环节。本文完整介绍了从零搭建推荐系统的技术方案,核心要点包括: 1. **分层架构**:召回→粗排→精排→重排的四层漏斗,平衡效果与性能 2. **特征工程**:用户、物品、交叉、上下文四维特征体系,是推荐效果的基础 3. **双塔召回**:向量检索方案,支持百万级候选集的毫秒级召回 4. **多目标排序**:同时优化CTR和CVR,最大化业务价值 5. **工程实践**:Faiss向量检索、FastAPI在线服务、A/B测试框架 在实际项目中,还需要关注特征实时性、模型更新频率、冷启动策略、AB实验平台等工程细节。推荐系统的迭代是一个持续优化的过程,需要数据、算法和工程的紧密配合。