一、HyperLogLog:亿级UV统计
统计每天的独立用户数(UV),如果用Set存储用户ID,1亿用户需要约800MB。HyperLogLog用12KB内存就能统计,误差率约0.81%:
import redis
r = redis.Redis()
# 用户每次访问时记录
def record_visit(user_id: str, page: str, date: str):
key = f"uv:{page}:{date}"
r.pfadd(key, user_id) # 自动去重
# 统计当天UV
def get_uv(page: str, date: str) -> int:
return r.pfcount(f"uv:{page}:{date}")
# 合并多天UV(跨天统计)
def get_weekly_uv(page: str, start_date: str, end_date: str) -> int:
keys = [f"uv:{page}:{date}" for date in date_range(start_date, end_date)]
dest_key = f"uv:{page}:weekly"
r.pfmerge(dest_key, *keys)
return r.pfcount(dest_key)
# 压测:100万用户,12KB内存 vs Set的800MB
import time
pipe = r.pipeline()
for i in range(1_000_000):
pipe.pfadd("test_hll", f"user_{i}")
pipe.execute()
print(f"HLL统计结果: {r.pfcount('test_hll')}") # 约1000000,误差<1%二、Sorted Set:实时排行榜
# 游戏积分排行榜
class LeaderBoard:
def __init__(self, name: str):
self.key = f"leaderboard:{name}"
def add_score(self, user_id: str, score: float):
"""添加或更新用户分数"""
self.r.zadd(self.key, {user_id: score})
def increment_score(self, user_id: str, delta: float) -> float:
"""原子性地增加分数"""
return self.r.zincrby(self.key, delta, user_id)
def get_rank(self, user_id: str) -> int:
"""获取排名(从0开始)"""
rank = self.r.zrevrank(self.key, user_id)
return rank + 1 if rank is not None else None
def get_top_n(self, n: int = 10) -> list:
"""获取Top N,带分数"""
results = self.r.zrevrange(self.key, 0, n-1, withscores=True)
return [{"user_id": uid.decode(), "score": score, "rank": i+1}
for i, (uid, score) in enumerate(results)]
def get_around_me(self, user_id: str, range_count: int = 5) -> list:
"""获取我附近的排名(上下各N名)"""
rank = self.r.zrevrank(self.key, user_id)
start = max(0, rank - range_count)
stop = rank + range_count
results = self.r.zrevrange(self.key, start, stop, withscores=True)
return results三、Bitmap:用户签到统计
from datetime import date, timedelta
class CheckInSystem:
def check_in(self, user_id: int, check_date: date = None):
"""用户签到"""
if check_date is None:
check_date = date.today()
# key: checkin:YYYYMM,bit位置=用户ID
key = f"checkin:{check_date.strftime('%Y%m')}"
day = check_date.day
# SETBIT key offset value
r.setbit(key, user_id * 32 + day, 1)
def get_monthly_checkin(self, user_id: int, year: int, month: int) -> list:
"""获取某月的签到记录"""
key = f"checkin:{year:04d}{month:02d}"
checkins = []
days_in_month = 31
for day in range(1, days_in_month + 1):
offset = user_id * 32 + day
if r.getbit(key, offset):
checkins.append(day)
return checkins
def get_continuous_checkin_days(self, user_id: int) -> int:
"""计算当前连续签到天数"""
today = date.today()
continuous = 0
current = today
while True:
key = f"checkin:{current.strftime('%Y%m')}"
if r.getbit(key, user_id * 32 + current.day):
continuous += 1
current -= timedelta(days=1)
else:
break
return continuous四、Stream:轻量级消息队列
# Stream比List更适合做消息队列:
# - 支持消费者组(多消费者协作,不重复消费)
# - 消息持久化,消费失败可重试
# - 支持消息确认(ACK)
# 生产者
def publish_order_event(order_id: str, event_type: str, data: dict):
r.xadd(
"orders:events", # stream名
{ # 消息内容
"order_id": order_id,
"event_type": event_type,
"data": json.dumps(data)
},
maxlen=10000 # 最多保留10000条消息
)
# 消费者组(初始化一次)
r.xgroup_create("orders:events", "order-processor", id="0", mkstream=True)
# 消费者
def consume_orders():
consumer_name = f"worker-{os.getpid()}"
while True:
# 读取未消费的消息,阻塞等待最多1秒
messages = r.xreadgroup(
groupname="order-processor",
consumername=consumer_name,
streams={"orders:events": ">"}, # ">" = 只读新消息
count=10,
block=1000
)
if not messages:
continue
for stream, msgs in messages:
for msg_id, data in msgs:
try:
process_order_event(data)
# 处理成功,确认消息
r.xack("orders:events", "order-processor", msg_id)
except Exception as e:
# 处理失败,不ACK,消息会留在PEL(待确认列表)中
log.error(f"处理消息{msg_id}失败: {e}")
# 处理超时未确认的消息(重新分配给其他消费者)
def reprocess_stuck_messages():
stuck = r.xpending_range("orders:events", "order-processor",
min="-", max="+",
count=100,
idle=60000) # 超过60秒未ACK
for msg in stuck:
r.xclaim("orders:events", "order-processor",
"recovery-worker", 60000, [msg["message_id"]])五、Lua脚本:原子操作
-- 秒杀库存扣减(原子操作,防超卖)
local key = KEYS[1] -- 商品库存key
local user_key = KEYS[2] -- 用户购买记录key
local user_id = ARGV[1] -- 用户ID
local quantity = tonumber(ARGV[2]) -- 购买数量
-- 检查用户是否已购买
if redis.call('SISMEMBER', user_key, user_id) == 1 then
return -1 -- 已购买,不能重复
end
-- 检查库存
local stock = tonumber(redis.call('GET', key))
if stock == nil or stock < quantity then
return 0 -- 库存不足
end
-- 扣减库存并记录用户
redis.call('DECRBY', key, quantity)
redis.call('SADD', user_key, user_id)
return 1 -- 成功# Python调用Lua脚本
script = r.register_script(lua_script)
result = script(keys=[f"stock:{product_id}", f"buyers:{product_id}"],
args=[user_id, quantity])Redis不只是缓存,合理使用高级数据结构,很多业务功能可以在Redis层直接完成,避免复杂的数据库操作。