Dagster与LLM:数据感知的工作流编排
--- title: "Dagster与LLM:数据感知的工作流编排" description: "使用Dagster构建数据感知的LLM工作流,实现类型安全的数据管道" tags: ["Dagster", "LLM", "数据管道", "工作流", "MLOps"] category: "llm" icon: "💎"
Dagster与LLM:数据感知的工作流编排
Dagster概述
Dagster是一个数据感知的工作流编排平台,通过资产(Asset)和操作(Op)的概念构建可靠的数据管道。特别适合构建数据驱动的LLM应用。
核心概念
1. 定义Op(操作)
from dagster import op, Out, In
from openai import OpenAI
@op(out=Out(str))
def fetch_article(article_id: str) -> str:
"""获取文章内容"""
print(f"获取文章: {article_id}")
return f"文章 {article_id} 的内容..."
@op(out=Out(str))
def summarize_article(context, article: str) -> str:
"""总结文章"""
client = OpenAI()
response = client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": f"总结: {article}"}]
)
result = response.choices[0].message.content
context.log.info(f"总结完成: {result[:50]}...")
return result
@op(out=Out(dict))
def analyze_sentiment(context, text: str) -> dict:
"""情感分析"""
client = OpenAI()
response = client.chat.completions.create(
model="gpt-4",
messages=[{
"role": "user",
"content": f"分析以下文本的情感,返回JSON: {{\"sentiment\": \"positive/negative/neutral\", \"score\": 0-1}}\n{text}"
}]
)
import json
return json.loads(response.choices[0].message.content)
2. 定义Job(作业)
from dagster import job
@job
def article_processing_job():
"""文章处理作业"""
article = fetch_article()
summary = summarize_article(article)
sentiment = analyze_sentiment(summary)
# 执行作业
result = article_processing_job.execute_in_process()
资产式开发
1. 定义Asset(资产)
from dagster import asset, AssetIn, Output
import json
@asset
def raw_articles():
"""原始文章资产"""
return [
{"id": "1", "content": "AI正在改变世界..."},
{"id": "2", "content": "LLM技术快速发展..."}
]
@asset
def processed_articles(raw_articles):
"""处理后的文章"""
processed = []
client = OpenAI()
for article in raw_articles:
response = client.chat.completions.create(
model="gpt-4",
messages=[{
"role": "user",
"content": f"处理文章: {article['content']}"
}]
)
processed.append({
"id": article["id"],
"processed_content": response.choices[0].message.content
})
return processed
@asset
def article_analytics(processed_articles):
"""文章分析"""
return {
"total": len(processed_articles),
"avg_length": sum(len(a["processed_content"]) for a in processed_articles) / len(processed_articles)
}
2. 资产传感器
from dagster import sensor, RunRequest
@sensor(job=article_processing_job)
def new_article_sensor(context):
"""新文章传感器"""
# 检查是否有新文章
last_check = context.last_run_key or "0"
new_articles = check_new_articles(last_check)
if new_articles:
return RunRequest(
run_key=str(len(new_articles)),
run_config={"ops": {"fetch_article": {"config": {"article_id": new_articles[0]}}}}
)
高级模式
1. 分支逻辑
from dagster import op, Out, BranchOpMapping
@op(out=Out(outs={"success": Out(dict), "failure": Out(dict))})
def quality_check(context, article: dict):
"""质量检查"""
if len(article.get("content", "")) > 100:
context.log.info("质量检查通过")
return {"success": article}
else:
context.log.warning("质量检查失败")
return {"failure": article}
@op
def process_valid(article: dict):
"""处理有效文章"""
return f"处理: {article['content'][:50]}..."
@op
def handle_invalid(article: dict):
"""处理无效文章"""
return f"无效文章: {article}"
@job
def branched_job():
result = quality_check()
process_valid(result["success"])
handle_invalid(result["failure"])
2. 重试与错误处理
from dagster import op, RetryRequested
@op(
retry_policy=RetryPolicy(max_retries=3, delay=timedelta(seconds=10)),
config_schema={"max_retries": int}
)
def resilient_llm_call(context, prompt: str) -> str:
"""容错LLM调用"""
try:
client = OpenAI()
response = client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": prompt}]
)
return response.choices[0].message.content
except Exception as e:
context.log.error(f"LLM调用失败: {e}")
raise RetryRequested()
部署与监控
from dagster import Definitions
# 定义所有资源
defs = Definitions(
assets=[raw_articles, processed_articles, article_analytics],
jobs=[article_processing_job, branched_job],
sensors=[new_article_sensor],
)
# 使用dagster-webserver启动
# dagster webserver
最佳实践
- 类型安全:使用Dagster的类型系统确保数据正确性
- 资产优先:优先使用资产式开发,便于追踪数据血缘
- 测试友好:每个Op和Asset都可以独立测试
- 配置管理:使用配置系统管理环境差异
总结
Dagster为LLM工作流提供了数据感知的编排方案。通过资产和操作的抽象,可以构建类型安全、可测试、可追踪的AI数据管道。