Memory 模块开发者指南
本文面向 Memory 模块(Parser + Visualizer)的开发者,介绍设计思路、内部接口约定以及扩展开发指南。用户侧快速入门见 Memory Quickstart。
1. 设计概述
1.1 背景与目标
RL-Insight 已有时序分析 Parser(mstx / torch),基于 EventRow 数据模型输出算子级时间线事件。Memory Parser 新增内存分析能力,从 Ascend Profiler 输出的内存相关文件中提取内存分配信息,为 RL 训练的内存瓶颈分析提供数据支撑。
核心目标:在每条内存申请记录中补全以下信息:
调用栈(Call Stack)—— 定位内存申请源头
内存申请大小(Size)—— 量化内存消耗
内存申请时间(Allocation Time)—— 时序关联
内存占用时长(Duration)—— 生命周期分析
1.2 数据流
InputData (ASCEND_MEMORY)
│
├── DataChecker 校验
│
▼
MemoryClusterParser
│
├── allocate_prof_data() ← 扫描目录,构建 DataMap
│
├── mapper_func() ← 多进程并行调度
│ └── parse_analysis_data()
│ ├── _build_call_stack_index() ← 流式解析 trace_view.json
│ └── _parse_operator_memory() ← 解析 operator_memory.csv
│ └── _match_call_stack() ← 二分查找调用栈
│
├── reducer_func() ← 汇总排序
│
▼
┌───────────────────────────────┐
│ OutputData (MEMORY_SUMMARY) │ → pd.DataFrame
│ DataChecker 校验 │
│ ├── ParserOutputValidator │ 列结构:5 个必需列
│ └── MemoryContentRule │ 内容:数值/非空/正值
└───────────────────────────────┘
│
▼
MemoryVisualizer
│
└── generate_memory_timeline() → HTML + detail_data.js
1.3 类结构
BaseClusterParser (recipe/parser/parser.py)
└── MemoryClusterParser (recipe/parser/memory_parser.py)
├── input_type = DataEnum.ASCEND_MEMORY
├── allocate_prof_data() → 扫描目录,构建 DataMap
├── parse_analysis_data() → 主解析流程
├── _build_call_stack_index() → 流式解析 trace_view.json,构建调用栈索引
├── _parse_operator_memory() → 解析 operator_memory.csv,输出 dict[str, Any]
├── _match_call_stack() → name + ts 匹配调用栈
├── _get_data_map() → 构建 (role, rank_id) → [path] 映射
├── _get_rank_path_with_role() → 生成 DataMap 列表
├── _get_profiler_data_path() → 拼接 ASCEND_PROFILER_OUTPUT 路径
├── _get_rank_id() → 从 profiler_info_*.json 提取 rank_id
├── _get_task_role() → 从 profiler_metadata.json 提取 role
└── _extract_timestamp_key() → 提取目录名中的时间戳排序键
BaseVisualizer (recipe/visualizer/visualizer.py)
└── MemoryVisualizer (recipe/visualizer/memory_visualizer.py)
├── input_type = DataEnum.MEMORY_SUMMARY
├── run(data) → 委派 generate_memory_timeline
├── generate_memory_timeline() → 主流程:过滤→向量化→输出
├── _build_chart1_data() → 双指针滑动窗口,Chart1 hover 数据
└── _build_memory_html() → JSON 序列化 + 模板占位符替换
2. 内部接口
2.1 注册与数据类型
@register_cluster_parser("memory")
class MemoryClusterParser(BaseClusterParser):
input_type: DataEnum = DataEnum.ASCEND_MEMORY
属性 |
值 |
说明 |
|---|---|---|
注册名 |
|
CLI |
|
|
输入数据类型 |
输出类型 |
|
Parser 输出 / Visualizer 输入 |
DataEnum 新增值(定义在 recipe/data/data_checker.py):
class DataEnum(Enum):
ASCEND_MEMORY = "ascend_memory" # Memory Parser 输入
MEMORY_SUMMARY = "memory_summary" # Memory Parser 输出 / Visualizer 输入
2.2 MemoryEventRow
定义在 recipe/utils/schema.py:
class MemoryEventRow(TypedDict):
name: str # 算子名称
role: str # RL 角色
rank_id: int # 进程 rank
call_stack: str # 完整调用栈(";\r\n" 分隔)
call_stack_top: str # 调用栈顶层入口
size_kb: float # 内存大小(KB),正数=申请,负数=释放
start_time_ms: float # 内存申请时间(ms),与 BaseClusterParser.reducer_func 的排序键对齐
duration_ms: float # 内存占用时长(ms),0 表示未释放
total_allocated_mb: float # 申请时刻累计已分配内存(MB)
total_reserved_mb: float # 申请时刻累计预留内存(MB)
total_active_mb: float # 申请时刻累计活跃内存(MB)
device_type: str # 设备类型
设计决策:
size_kb:正数表示内存申请,负数表示内存释放,与operator_memory.csv中的Size(KB)语义一致duration_ms:若Duration(us)有值则转换,无值则为0.0(表示内存尚未释放)call_stack_top:取调用栈第一行(用户代码入口),便于快速定位,无需解析完整调用栈时间单位统一为毫秒(ms),与现有
EventRow保持一致
2.3 allocate_prof_data()
复用 MstxClusterParser 的目录扫描逻辑:
遍历
input_path,找到所有<date>_<time>_ascend_pt目录从
profiler_metadata.json提取role,从profiler_info_*.json提取rank_id按
_extract_timestamp_key提取的时间戳对同 (role, rank_id) 下的目录排序profiler_data_path指向ASCEND_PROFILER_OUTPUT目录(注意:与 MstxClusterParser 指向trace_view.json文件不同,Memory Parser 指向目录,因为需要同时访问trace_view.json和operator_memory.csv)
关键差异:
# MstxClusterParser
def _get_profiler_data_path(self, rank_id, data_path):
return os.path.join(data_path, Constant.ASCEND_PROFILER_OUTPUT, "trace_view.json")
# MemoryClusterParser
def _get_profiler_data_path(self, rank_id, data_path):
return os.path.join(data_path, Constant.ASCEND_PROFILER_OUTPUT)
2.4 parse_analysis_data()
主解析流程,接收单个 Rank 的数据路径,返回 list[dict[str, Any]]:
输入: profiler_data_path (ASCEND_PROFILER_OUTPUT 目录), rank_id, role
步骤1: 构建调用栈索引
_build_call_stack_index(trace_view_path)
→ 流式解析 trace_view.json (ijson)
→ 筛选 cat=="cpu_op" 且含 "Call stack" 的事件
→ 按 name 分组,组内按 ts 排序
→ 返回 dict[str, list[{ts, dur, call_stack}]]
步骤2: 解析 operator_memory.csv
_parse_operator_memory(csv_path, call_stack_index, rank_id, role)
→ 逐行构建 MemoryEventRow:
a. 时间转换: us → ms (÷ 1000)
b. 调用栈匹配: _match_call_stack(name, allocation_time, index)
未命中 → call_stack = "", call_stack_top = ""
c. 提取 call_stack_top: 取调用栈第一行
d. duration_ms: Duration(us) 有值则转换,无值则为 0
步骤3: 返回 list[dict[str, Any]]
2.5 _build_call_stack_index()
流式解析 trace_view.json,构建调用栈索引:
输入:
trace_view.json文件路径输出:
dict[str, dict],每个值包含"entries"(list[{ts, dur, call_stack}],按ts升序排序)和"ts_list"(预提取的ts列表,供二分查找直接使用,避免每次调用重建列表)过滤条件:仅保留
cat=="cpu_op"且args中含"Call stack"的事件流式解析:使用
ijson.items(f, "item")逐条读取,避免将整个 JSON 加载到内存排序:组内按
ts升序排序,确保后续二分查找的正确性
2.6 _match_call_stack()
调用栈匹配算法,基于二分查找:
输入:算子名
name、分配时间allocation_time_us、调用栈索引输出:
(call_stack, call_stack_top),未命中返回("", "")匹配策略:在同名算子组中,查找
ts ≤ allocation_time的最近一条记录匹配语义:
ts是算子开始执行时间,Allocation Time是算子内触发内存分配的时间,因此Allocation Time ≥ ts;取ts最接近Allocation Time的一条即为触发该次内存分配的算子调用允许多对一:一个算子可能分配多次内存,因此多条
operator_memory记录可以匹配到同一条trace_view记录
import bisect
idx = bisect.bisect_right(ts_list, allocation_time_us) - 1
if idx < 0:
return "", ""
2.7 DataChecker 校验
输入侧:DataEnum.ASCEND_MEMORY 当前未注册校验规则(DataChecker.rules 中为空列表),输入校验由 Parser 内部的文件存在性检查承担。
输出侧:DataEnum.MEMORY_SUMMARY 已注册两条校验规则:
规则 |
类 |
校验内容 |
|---|---|---|
列结构 |
|
DataFrame 需包含 |
内容合法性 |
|
数值列可转 float; |
常量定义在 recipe/utils/schema.py(MEMORYKEYS),规则实现在 recipe/data/rules.py(MemoryContentRule)。
如需增加校验,参考 DataRule 扩展说明。
2.8 Visualizer 注册与数据类型
@register_cluster_visualizer("memory_html")
class MemoryVisualizer(BaseVisualizer):
input_type: DataEnum = DataEnum.MEMORY_SUMMARY
属性 |
值 |
说明 |
|---|---|---|
注册名 |
|
CLI |
|
|
接收 Parser 输出的 DataFrame |
渲染常量:
常量 |
值 |
说明 |
|---|---|---|
|
2000 |
Chart1 折线最大点数,超出均匀下采样 |
|
10 |
Chart1 hover 显示前 N 大活跃算子 |
2.9 generate_memory_timeline()
主流程,串联 5 个处理阶段:
输入: pd.DataFrame(MEMORYKEYS 5 列)
│
├── 阶段1: 过滤 size_kb > 0,计算 end_time_ms、size_mb
├── 阶段2: 向量化构建 Chart1 事件序列
│ └── np.concatenate([starts, ends]) → groupby("time") → cumsum()
├── 阶段3: Chart2 并行数组构建
│ └── 7 列 .tolist() 预提取,CS_POOL + CS_IDX 池化去重
├── 阶段4: _build_chart1_data() → tl_xy, tl_active
└── 阶段5: _build_memory_html() → 写入单文件
│
▼
输出: memory_timeline_{role}_rank{id}.html 路径
输入校验(直接返回 None):
data is None或data.empty过滤后全部
size_kb <= 0导致data.empty
2.10 _build_chart1_data()
静态方法,双指针滑动窗口算法,O(N+M):
active_intervals = []
interval_idx = 0
for point in memory_timeline:
# 入队:start <= t 的 interval
while interval_idx < n and intervals[interval_idx][0] <= t:
active_intervals.append(...)
# 惰性删除:end <= t 已结束
active_intervals = [x for x in active_intervals if x[1] > t]
# 按 size_kb 降序取 top-10 存入 tl_active
返回 (tl_xy, tl_active),tl_active 每点存 [活跃总数, [[算子名, size], ...]],供 Chart1 hover 使用。
2.11 _build_memory_html()
生成一对文件字符串:
detail_data.js:13 个 JS 变量声明,使用紧凑 JSON(
separators=(",", ":"))HTML:读取
memory_template.html,替换占位符:__DATA_FILE__→detail_data_{role}_rank{id}.js文件名
3. 扩展指南
3.1 新增内存数据校验规则
适用于:为 ASCEND_MEMORY 增加输入校验。
在
recipe/data/rules.py中继承ValidationRule,实现check()和error_message在
DataChecker.rules中为新类型挂载规则
示例——为 ASCEND_MEMORY 增加 operator_memory.csv 存在性校验:
# recipe/data/rules.py
class AscendMemoryFileExistsRule(ValidationRule):
"""校验 ASCEND_PROFILER_OUTPUT 下存在 operator_memory.csv"""
def check(self, data) -> bool:
if not isinstance(data, str):
self._error_message = "Data object is not a path"
return False
root_path = Path(data)
if not root_path.exists():
self._error_message = f"Source path does not exist: {data}"
return False
ascend_pt_pattern = str(root_path / "*" / "*_ascend_pt")
ascend_pt_folders = glob.glob(ascend_pt_pattern)
for folder in ascend_pt_folders:
csv_path = Path(folder) / "ASCEND_PROFILER_OUTPUT" / "operator_memory.csv"
if not csv_path.exists():
self._error_message = f"operator_memory.csv not found in {folder}"
return False
return True
@property
def error_message(self) -> str:
return self._error_message
# recipe/data/data_checker.py
class DataChecker:
rules: dict[DataEnum, List[ValidationRule]] = {
DataEnum.ASCEND_MEMORY: [PathExistsRule(), AscendMemoryFileExistsRule()],
...
}
3.2 已有 Memory Visualizer
Memory Visualizer 已完成实现,代码位于 recipe/visualizer/memory_visualizer.py。内部接口详见 2.8 - 2.11。
如需新增其他形式的 Memory Visualizer(如热力图),参考 Visualizer 扩展指南。
3.3 支持 GPU(CUDA)内存分析
适用于:将 Memory Parser 扩展至支持 CUDA 内存 Profiling 数据。
在
DataEnum中新增CUDA_MEMORY = "cuda_memory"新增
CudaMemoryParser,继承BaseClusterParser,实现allocate_prof_data()和parse_analysis_data()使用
@register_cluster_parser("cuda_memory")注册定义
CudaMemoryEventRow(若字段与MemoryEventRow不同)或复用MemoryEventRow在
DataChecker.rules中为新类型挂载校验规则更新
main.py中对应 parser type 的注册名在
docs/recipe/data/data_specification.md中补充数据形态说明
3.4 改进调用栈匹配精度
当前匹配策略基于算子名 + 时间戳二分查找,在极短时间内同一算子多次调用时可能匹配不精确。可能的改进方向:
增加
dur约束:匹配时额外要求allocation_time ≤ ts + dur,即分配时间必须在算子执行时间范围内增加
tid约束:若operator_memory.csv可提供线程信息,可按name + tid分组匹配最近邻匹配:在
ts ≤ allocation_time的候选中,选择allocation_time - ts最小的一条
4. 依赖
Parser:
库 |
用途 |
说明 |
|---|---|---|
|
流式解析大 JSON |
需安装: |
|
解析 CSV |
标准库 |
|
二分查找调用栈 |
标准库 |
Visualizer:无额外依赖(numpy、pandas、loguru、omegaconf 均为项目公共依赖;Plotly.js 通过 HTML 模板 CDN 加载)。
5. 测试
测试文件:
测试文件 |
覆盖范围 |
|---|---|
|
Parser 单元测试:注册、 |
|
E2E:完整 Pipeline(Parser + DataChecker + Visualizer),使用 |