如何让Agent在实时性要求高的场景(如股市交易)下运行?
定义
实时 Agent(Real-time Agent)是指在时间敏感场景下,Agent 能够在严格的时间约束内完成感知、决策和执行的闭环。在股市交易等高频场景中,延迟不仅影响体验,更直接导致经济损失。
原理
实时 Agent 的核心挑战在于”在有限时间内做出足够好的决策”,而非追求最优解。基于”时间感知推理”(Time-aware Reasoning)框架:
- 时间预算分配:为每个决策阶段设定最大耗时预算
- 渐进式推理:先出快速初版,有时间再优化
- 预计算与缓存:静态信息预加载,动态信息重复利用
- 异步执行:不阻塞主流程的后台处理
实时架构
flowchart TD
subgraph "输入层"
A1[行情数据流<br/>WebSocket]
A2[用户指令<br/>低延迟通道]
end
subgraph "实时决策层 < 100ms"
B1[快速预处理器]
B2[缓存查询]
B3[规则引擎<br/>条件触发]
B4[LLM-Cache<br/>预生成策略]
end
subgraph "深度分析层 > 100ms"
C1[LLM推理<br/>异步执行]
C2[趋势预测<br/>后台线程]
C3[风险评估<br/>周期性执行]
end
subgraph "执行层"
D1[交易执行<br/>原子操作]
D2[状态更新]
D3[日志记录]
end
A1 --> B1
A2 --> B3
B1 --> B2
B2 --> B3
B3 -->|需要LLM| C1
B3 -->|规则匹配| D1
C1 -->|结果| D1
C2 --> C1
C3 --> C1
subgraph "时间监控"
E1[执行时间跟踪]
E2[超时熔断]
E3[质量回退]
end
E1 --> B1
E1 --> C1
E1 --> D1
代码示例
from dataclasses import dataclass, field
from typing import List, Dict, Optional, Callable, Any
from datetime import datetime, timedelta
from enum import Enum
import asyncio
import time
import json
import threading
class TimeBudget(Enum):
URGENT = 50 # 紧急操作 50ms
FAST = 200 # 快速操作 200ms
NORMAL = 1000 # 正常操作 1s
SLOW = 5000 # 深度分析 5s
@dataclass
class TimeConstraint:
"""时间约束"""
budget_ms: int # 时间预算(毫秒)
fallback_action: str # 超时后的降级操作
quality_threshold: float = 0.6 # 最低质量可接受
hard_deadline_ms: int = 0 # 硬截止时间
class TimeMonitor:
"""时间监控器"""
def __init__(self, budget_ms: int):
self.budget_ms = budget_ms
self.start_time = None
self.warnings = []
def __enter__(self):
self.start_time = time.perf_counter()
return self
def __exit__(self, *args):
elapsed = self.elapsed_ms
if elapsed > self.budget_ms:
self.warnings.append(f"超时: {elapsed:.0f}ms > 预算{self.budget_ms}ms")
@property
def elapsed_ms(self) -> float:
return (time.perf_counter() - self.start_time) * 1000 if self.start_time else 0
@property
def is_over_budget(self) -> bool:
return self.elapsed_ms > self.budget_ms
def remaining_ms(self) -> float:
return max(0, self.budget_ms - self.elapsed_ms)
class CacheLayer:
"""缓存层"""
def __init__(self, ttl_ms: Dict[str, int] = None):
self.cache = {} # key -> {"value": ..., "expires": timestamp}
self.ttl = ttl_ms or {"market_data": 100, "analysis": 5000}
def get(self, key: str) -> Optional[Any]:
entry = self.cache.get(key)
if entry and time.time() * 1000 < entry["expires"]:
return entry["value"]
return None
def set(self, key: str, value: Any, cache_group: str = "default"):
ttl = self.ttl.get(cache_group, 1000)
self.cache[key] = {
"value": value,
"expires": time.time() * 1000 + ttl
}
class RealTimeDecisionEngine:
"""实时决策引擎"""
def __init__(self, cache: CacheLayer, llm):
self.cache = cache
self.llm = llm
self.rule_engine = self._build_rule_engine()
def _build_rule_engine(self) -> Dict:
"""预构建规则引擎"""
return {
"price_spike": {
"condition": "price_change > threshold",
"action": "alert_or_trade",
"threshold": 0.03 # 3%涨跌触发
},
"volume_surge": {
"condition": "volume > avg_volume * 3",
"action": "check_news"
}
}
def decide(self, market_data: Dict, constraint: TimeConstraint) -> Dict:
"""快速决策(最慢200ms)"""
with TimeMonitor(constraint.budget_ms) as monitor:
symbol = market_data.get("symbol", "")
# 1. 缓存查询(<1ms)
cached = self.cache.get(f"decision_{symbol}")
if cached:
return cached
# 2. 规则引擎快速判断(<5ms)
rule_result = self._apply_rules(market_data)
if rule_result.get("action") and constraint.budget_ms <= TimeBudget.URGENT.value:
result = {"source": "rule_engine", "decision": rule_result}
self.cache.set(f"decision_{symbol}", result, "market_data")
return result
# 3. 如果有剩余时间,用LLM(<200ms)
if not monitor.is_over_budget:
llm_result = self._quick_llm_decision(market_data,
monitor.remaining_ms())
if llm_result:
result = {"source": "llm_fast", "decision": llm_result}
self.cache.set(f"decision_{symbol}", result, "market_data")
return result
# 4. 降级:返回规则引擎结果
return {"source": "fallback_rules", "decision": rule_result}
def _apply_rules(self, data: Dict) -> Dict:
"""应用规则引擎"""
decisions = []
for rule_name, rule in self.rule_engine.items():
if self._evaluate_condition(rule["condition"], data):
decisions.append({
"rule": rule_name,
"action": rule["action"]
})
return {"matches": decisions, "count": len(decisions)}
def _evaluate_condition(self, condition: str, data: Dict) -> bool:
"""评估规则条件"""
if "price_change" in condition:
change = data.get("price_change_pct", 0)
threshold = 0.03
return abs(change) > threshold
return False
def _quick_llm_decision(self, data: Dict, time_left_ms: float) -> Optional[Dict]:
"""快速LLM决策"""
if time_left_ms < 50: # 时间不够
return None
prompt = f"""快速分析市场数据并给出交易建议(50字以内):
股票: {data.get('symbol')}
当前价: {data.get('price')}
涨跌幅: {data.get('price_change_pct')}%
成交量: {data.get('volume')}
MA5: {data.get('ma5')}
请只输出: buy/sell/hold 和原因(5字内)"""
try:
response = self.llm.generate(prompt, max_tokens=30)
return {"decision": response.strip(), "fast": True}
except:
return None
class RealTimeAgent:
"""实时Agent"""
def __init__(self, llm, decision_engine: RealTimeDecisionEngine):
self.llm = llm
self.decision_engine = decision_engine
self.execution_log = []
self.background_tasks = []
def handle_market_event(self, event: Dict) -> Dict:
"""处理市场事件"""
symbol = event.get("symbol", "")
event_type = event.get("type", "")
# 确定时间预算
if event_type in ("price_alert", "stop_loss"):
budget = TimeBudget.URGENT.value # 50ms
elif event_type == "signal":
budget = TimeBudget.FAST.value # 200ms
else:
budget = TimeBudget.NORMAL.value # 1s
constraint = TimeConstraint(
budget_ms=budget,
fallback_action="do_nothing",
hard_deadline_ms=budget + 100
)
# 快速决策
decision = self.decision_engine.decide(event, constraint)
# 异步后台深度分析(不阻塞主流程)
self._async_deep_analysis(event)
# 记录
self.execution_log.append({
"time": datetime.now().isoformat(),
"symbol": symbol,
"event": event_type,
"decision": decision,
"latency_ms": time.perf_counter() * 1000
})
return decision
def _async_deep_analysis(self, event: Dict):
"""异步深度分析"""
def _analyze():
if self.llm:
prompt = f"深度分析: {event.get('symbol')} 近期走势和技术指标..."
result = self.llm.generate(prompt)
self.cache.set(f"deep_analysis_{event.get('symbol')}",
result, "analysis")
thread = threading.Thread(target=_analyze, daemon=True)
thread.start()
self.background_tasks.append(thread)
def precompute(self, symbols: List[str]):
"""预计算 - 在交易开始前预热"""
print(f"🔄 预计算 {len(symbols)} 个股票的基本指标...")
for symbol in symbols:
self.cache.set(f"decision_{symbol}",
{"precomputed": True}, "market_data")
def get_latency_report(self) -> str:
"""延迟报告"""
if not self.execution_log:
return "暂无执行记录"
recent = self.execution_log[-100:]
avg_latency = sum(l.get("latency_ms", 0) for l in recent) / len(recent)
max_latency = max(l.get("latency_ms", 0) for l in recent)
return f"""
📊 延迟报告
━━━━━━━━━━━━━━━
最近{len(recent)}次操作:
平均延迟: {avg_latency:.0f}ms
最大延迟: {max_latency:.0f}ms
"""
def set_speed_mode(self, mode: str):
"""设置速度模式"""
modes = {
"turbo": { # 极速模式
"llm_enabled": False,
"rules_only": True,
"cache_ttl": 50
},
"fast": { # 快速模式
"llm_enabled": True,
"llm_quick_only": True,
"cache_ttl": 100
},
"balanced": { # 平衡模式
"llm_enabled": True,
"allow_deep_analysis": False,
"cache_ttl": 500
},
"deep": { # 深度分析
"llm_enabled": True,
"allow_deep_analysis": True,
"cache_ttl": 2000
}
}
print(f"⚡ 切换为 {mode} 模式")
return modes.get(mode, modes["balanced"])
要点总结
- 时间预算管理:不同事件分配不同的时间预算,紧急事件 50ms 内给出初判
- 缓存为王:预计算 + 实时缓存,避免重复计算
- 渐进式推理:规则引擎先出初版结果,有时间再用 LLM 优化
- 异步深分析:紧急决策走快速通道,深度分析在后台异步运行
- 优雅降级:时间不够时自动使用规则引擎结果,绝不卡死
面试常见问题
Q1:LLM 在实时场景下太慢怎么办?
A:核心策略是不依赖 LLM 做实时判断。LLM 仅用于非实时的策略生成和风险分析。实时决策完全基于规则引擎 + 缓存 + 预计算。LLM 结果作为”建议”纳入考虑,但不是实时通道的必要环节。
Q2:50ms 内做出交易决策,准确性如何保证?
A:50ms 内只能做”条件反射式”决策——基于预设规则的快速判断。准确性依赖规则的质量,而非推理质量。可以通过离线测试出规则引擎的准确率,确保在高频场景中可靠。
Q3:实时 Agent 的重试策略和普通 Agent 有什么不同?
A:实时场景中重试成本极高(时间窗口可能已经关闭)。策略是”单次失败即降级”——主通道超时后立即切换到缓存结果或规则引擎结果,不等待重试。重试在异步后台进行,结果用于下次决策。
© 版权声明
文章版权归作者所有,未经允许请勿转载。
THE END


暂无评论内容