Memory Parser 开发者指南
本文面向 Memory Parser 的开发者,介绍其设计思路、内部接口约定以及扩展开发指南。用户侧快速入门见 Memory Parser Quickstart。
1. 设计概述
1.1 背景与目标
RL-Insight 已有时序分析 Parser(mstx / torch),基于 EventRow 数据模型输出算子级时间线事件。Memory Parser 新增内存分析能力,从 Ascend Profiler 输出的内存相关文件中提取内存分配信息,为 RL 训练的内存瓶颈分析提供数据支撑。
核心目标:在每条内存申请记录中补全以下信息:
调用栈(Call Stack)—— 定位内存申请源头
内存申请大小(Size)—— 量化内存消耗
内存申请时间(Allocation Time)—— 时序关联
内存占用时长(Duration)—— 生命周期分析
1.2 数据流
InputData (ASCEND_MEMORY)
│
├── DataChecker 校验
│
▼
MemoryClusterParser
│
├── allocate_prof_data() ← 扫描目录,构建 DataMap
│
├── mapper_func() ← 多进程并行调度
│ └── parse_analysis_data()
│ ├── _build_call_stack_index() ← 流式解析 trace_view.json
│ └── _parse_operator_memory() ← 解析 operator_memory.csv
│ └── _match_call_stack() ← 二分查找调用栈
│
├── reducer_func() ← 汇总排序
│
▼
OutputData (SUMMARY_MEMORY_EVENT) → pd.DataFrame
1.3 类结构
BaseClusterParser (rl_insight/parser/parser.py)
└── MemoryClusterParser (rl_insight/parser/memory_parser.py)
├── input_type = DataEnum.ASCEND_MEMORY
├── allocate_prof_data() → 扫描目录,构建 DataMap
├── parse_analysis_data() → 主解析流程
├── _build_call_stack_index() → 流式解析 trace_view.json,构建调用栈索引
├── _parse_operator_memory() → 解析 operator_memory.csv,输出 dict[str, Any]
├── _match_call_stack() → name + ts 匹配调用栈
├── _get_data_map() → 构建 (role, rank_id) → [path] 映射
├── _get_rank_path_with_role() → 生成 DataMap 列表
├── _get_profiler_data_path() → 拼接 ASCEND_PROFILER_OUTPUT 路径
├── _get_rank_id() → 从 profiler_info_*.json 提取 rank_id
├── _get_task_role() → 从 profiler_metadata.json 提取 role
└── _extract_timestamp_key() → 提取目录名中的时间戳排序键
2. 内部接口
2.1 注册与数据类型
@register_cluster_parser("memory")
class MemoryClusterParser(BaseClusterParser):
input_type: DataEnum = DataEnum.ASCEND_MEMORY
属性 |
值 |
说明 |
|---|---|---|
注册名 |
|
CLI |
|
|
输入数据类型 |
输出类型 |
|
Parser 输出 / Visualizer 输入 |
DataEnum 新增值(定义在 rl_insight/data/data_checker.py):
class DataEnum(Enum):
ASCEND_MEMORY = "ascend_memory" # Memory Parser 输入
SUMMARY_MEMORY_EVENT = "summary_memory_event" # Memory Parser 输出
2.2 MemoryEventRow
定义在 rl_insight/utils/schema.py:
class MemoryEventRow(TypedDict):
name: str # 算子名称
role: str # RL 角色
rank_id: int # 进程 rank
call_stack: str # 完整调用栈(";\r\n" 分隔)
call_stack_top: str # 调用栈顶层入口
size_kb: float # 内存大小(KB),正数=申请,负数=释放
start_time_ms: float # 内存申请时间(ms),与 BaseClusterParser.reducer_func 的排序键对齐
duration_ms: float # 内存占用时长(ms),0 表示未释放
total_allocated_mb: float # 申请时刻累计已分配内存(MB)
total_reserved_mb: float # 申请时刻累计预留内存(MB)
total_active_mb: float # 申请时刻累计活跃内存(MB)
device_type: str # 设备类型
设计决策:
size_kb:正数表示内存申请,负数表示内存释放,与operator_memory.csv中的Size(KB)语义一致duration_ms:若Duration(us)有值则转换,无值则为0.0(表示内存尚未释放)call_stack_top:取调用栈第一行(用户代码入口),便于快速定位,无需解析完整调用栈时间单位统一为毫秒(ms),与现有
EventRow保持一致
2.3 allocate_prof_data()
复用 MstxClusterParser 的目录扫描逻辑:
遍历
input_path,找到所有<date>_<time>_ascend_pt目录从
profiler_metadata.json提取role,从profiler_info_*.json提取rank_id按
_extract_timestamp_key提取的时间戳对同 (role, rank_id) 下的目录排序profiler_data_path指向ASCEND_PROFILER_OUTPUT目录(注意:与 MstxClusterParser 指向trace_view.json文件不同,Memory Parser 指向目录,因为需要同时访问trace_view.json和operator_memory.csv)
关键差异:
# MstxClusterParser
def _get_profiler_data_path(self, rank_id, data_path):
return os.path.join(data_path, Constant.ASCEND_PROFILER_OUTPUT, "trace_view.json")
# MemoryClusterParser
def _get_profiler_data_path(self, rank_id, data_path):
return os.path.join(data_path, Constant.ASCEND_PROFILER_OUTPUT)
2.4 parse_analysis_data()
主解析流程,接收单个 Rank 的数据路径,返回 list[dict[str, Any]]:
输入: profiler_data_path (ASCEND_PROFILER_OUTPUT 目录), rank_id, role
步骤1: 构建调用栈索引
_build_call_stack_index(trace_view_path)
→ 流式解析 trace_view.json (ijson)
→ 筛选 cat=="cpu_op" 且含 "Call stack" 的事件
→ 按 name 分组,组内按 ts 排序
→ 返回 dict[str, list[{ts, dur, call_stack}]]
步骤2: 解析 operator_memory.csv
_parse_operator_memory(csv_path, call_stack_index, rank_id, role)
→ 逐行构建 MemoryEventRow:
a. 时间转换: us → ms (÷ 1000)
b. 调用栈匹配: _match_call_stack(name, allocation_time, index)
未命中 → call_stack = "", call_stack_top = ""
c. 提取 call_stack_top: 取调用栈第一行
d. duration_ms: Duration(us) 有值则转换,无值则为 0
步骤3: 返回 list[dict[str, Any]]
2.5 _build_call_stack_index()
流式解析 trace_view.json,构建调用栈索引:
输入:
trace_view.json文件路径输出:
dict[str, dict],每个值包含"entries"(list[{ts, dur, call_stack}],按ts升序排序)和"ts_list"(预提取的ts列表,供二分查找直接使用,避免每次调用重建列表)过滤条件:仅保留
cat=="cpu_op"且args中含"Call stack"的事件流式解析:使用
ijson.items(f, "item")逐条读取,避免将整个 JSON 加载到内存排序:组内按
ts升序排序,确保后续二分查找的正确性
2.6 _match_call_stack()
调用栈匹配算法,基于二分查找:
输入:算子名
name、分配时间allocation_time_us、调用栈索引输出:
(call_stack, call_stack_top),未命中返回("", "")匹配策略:在同名算子组中,查找
ts ≤ allocation_time的最近一条记录匹配语义:
ts是算子开始执行时间,Allocation Time是算子内触发内存分配的时间,因此Allocation Time ≥ ts;取ts最接近Allocation Time的一条即为触发该次内存分配的算子调用允许多对一:一个算子可能分配多次内存,因此多条
operator_memory记录可以匹配到同一条trace_view记录
import bisect
idx = bisect.bisect_right(ts_list, allocation_time_us) - 1
if idx < 0:
return "", ""
2.7 DataChecker 校验
DataEnum.ASCEND_MEMORY 当前未注册校验规则(DataChecker.rules 中为空列表),输入校验由 Parser 内部的文件存在性检查承担。DataEnum.SUMMARY_MEMORY_EVENT 同样未注册校验规则。
如需增加校验,参考 DataRule 扩展说明。
3. 扩展指南
3.1 新增内存数据校验规则
适用于:为 ASCEND_MEMORY 或 SUMMARY_MEMORY_EVENT 增加输入/输出校验。
在
rl_insight/data/rules.py中继承ValidationRule,实现check()和error_message在
DataChecker.rules中为新类型挂载规则
示例——为 ASCEND_MEMORY 增加 operator_memory.csv 存在性校验:
# rl_insight/data/rules.py
class AscendMemoryFileExistsRule(ValidationRule):
"""校验 ASCEND_PROFILER_OUTPUT 下存在 operator_memory.csv"""
def check(self, data) -> bool:
if not isinstance(data, str):
self._error_message = "Data object is not a path"
return False
root_path = Path(data)
if not root_path.exists():
self._error_message = f"Source path does not exist: {data}"
return False
ascend_pt_pattern = str(root_path / "*" / "*_ascend_pt")
ascend_pt_folders = glob.glob(ascend_pt_pattern)
for folder in ascend_pt_folders:
csv_path = Path(folder) / "ASCEND_PROFILER_OUTPUT" / "operator_memory.csv"
if not csv_path.exists():
self._error_message = f"operator_memory.csv not found in {folder}"
return False
return True
@property
def error_message(self) -> str:
return self._error_message
# rl_insight/data/data_checker.py
class DataChecker:
rules: dict[DataEnum, List[ValidationRule]] = {
DataEnum.ASCEND_MEMORY: [PathExistsRule(), AscendMemoryFileExistsRule()],
...
}
3.2 新增 Memory Visualizer
适用于:为 MemoryEventRow 输出增加可视化能力。
新增模块
rl_insight/visualizer/memory_visualizer.py继承
BaseVisualizer,设置input_type = DataEnum.SUMMARY_MEMORY_EVENT实现
run()方法,消费pd.DataFrame生成图表使用
@register_cluster_visualizer("<name>")注册更新
main.py中--vis-type的 help 文本在
rl_insight/visualizer/__init__.py中导出新类
示例:
# rl_insight/visualizer/memory_visualizer.py
from .visualizer import BaseVisualizer, register_cluster_visualizer
from rl_insight.data import DataEnum
@register_cluster_visualizer("memory_heatmap")
class MemoryHeatmapVisualizer(BaseVisualizer):
input_type: DataEnum = DataEnum.SUMMARY_MEMORY_EVENT
def __init__(self, config: dict):
super().__init__(config)
self.output_path = config.get("output_path", "output")
def run(self, data):
# 实现内存热力图可视化
# data 为 pd.DataFrame,包含 MemoryEventRow 字段
...
3.3 支持 GPU(CUDA)内存分析
适用于:将 Memory Parser 扩展至支持 CUDA 内存 Profiling 数据。
在
DataEnum中新增CUDA_MEMORY = "cuda_memory"新增
CudaMemoryParser,继承BaseClusterParser,实现allocate_prof_data()和parse_analysis_data()使用
@register_cluster_parser("cuda_memory")注册定义
CudaMemoryEventRow(若字段与MemoryEventRow不同)或复用MemoryEventRow在
DataChecker.rules中为新类型挂载校验规则更新
main.py中--profiler-type的 help 文本在
docs/data/data_specification.md中补充数据形态说明
3.4 改进调用栈匹配精度
当前匹配策略基于算子名 + 时间戳二分查找,在极短时间内同一算子多次调用时可能匹配不精确。可能的改进方向:
增加
dur约束:匹配时额外要求allocation_time ≤ ts + dur,即分配时间必须在算子执行时间范围内增加
tid约束:若operator_memory.csv可提供线程信息,可按name + tid分组匹配最近邻匹配:在
ts ≤ allocation_time的候选中,选择allocation_time - ts最小的一条
4. 依赖
库 |
用途 |
说明 |
|---|---|---|
|
流式解析大 JSON |
需安装: |
|
解析 CSV |
标准库 |
|
二分查找调用栈 |
标准库 |
5. 测试
测试文件位于 tests/parser/test_memory_parser.py,覆盖以下场景:
Parser 注册(
"memory"在CLUSTER_PARSER_REGISTRY中)_build_call_stack_index:过滤cpu_op、过滤无调用栈事件、按name分组、按ts排序、空 JSON_match_call_stack:匹配成功、多条记录取最近、未命中、所有ts大于allocation_time_parse_operator_memory:正常解析、空 CSV、调用栈匹配/未匹配allocate_prof_data:目录扫描、rank_id 提取、role 提取E2E:完整解析流程