数据流 3.0 元素用于触发平台侧 Dagster 编排的数据流作业,并同步或异步获取运行结果。同步类为 DeepPipeline,异步类为 AsyncDeepPipeline,对应 API deepfos.api.deep_pipeline.DeepPipelineAPI(module_type = DPL)。
更多说明可参考在线文档:
from deepfos.element.deep_pipeline import DeepPipeline
pipe = DeepPipeline(
element_name='PIPE_DEMO',
folder_id='10086'
)
run(...)一次性提交并阻塞直到拿到结果(内部先 run_async 再 result)。
from deepfos.lib.constant import UNSET
# 无参数运行
out = pipe.run()
# 带参数(可为 dict 或与后端约定一致的结构)
out = pipe.run(parameter={"year": "2024", "period": "3"}, timeout=600)
# in_process:兼容参数;同步路径固定为同进程不带启停,详见源码说明
out = pipe.run(in_process=True)
run_async + result适合长耗时作业:先取运行 ID,再轮询结果。
run_id = pipe.run_async(
parameter={"batch": "A01"},
in_process=False # False:服务端带启停启动;True:同进程
)
result = pipe.result(run_id, timeout=1200)
result 会在部分错误码(如超时等待)下按 SDK 策略重试;最终失败可能映射为 RunFailedError、RunTerminated、ReleaseFlowTimeout 等(见 deepfos.exceptions)。
run_batch对多组参数并发发起多次 run_async,返回 运行 ID 列表。
run_ids = pipe.run_batch(
parameters=[
{"year": "2024", "period": "1"},
{"year": "2024", "period": "2"},
],
in_process=False
)
for rid in run_ids:
print(pipe.result(rid, timeout=600))
参数类型:parameter 可为任意可 JSON 序列化的结构,需与数据流发布版中定义的入参一致。
in_process:run_async / run_batch 中 False 表示默认的带启停启动;同步 run 固定走同进程逻辑(详见源码 warnings)。
超时:timeout 为整段等待上限(秒);底层单次 HTTP 仍受全局 OPTION.api.timeout 约束。
运行前务必确认 has_approved_release,否则会直接报错。
大批量并行 run_batch 时注意服务端资源与并发限制。
与 REST 文档对照时,运行 ID 即接口 POST .../run 返回的任务标识,GET .../run/{id}/result 与之对应。
回到顶部
咨询热线
