← 返回首页
🧠

Dagster与LLM:数据感知的工作流编排

📂 llm ⏱ 2 min 353 words

--- title: "Dagster与LLM:数据感知的工作流编排" description: "使用Dagster构建数据感知的LLM工作流,实现类型安全的数据管道" tags: ["Dagster", "LLM", "数据管道", "工作流", "MLOps"] category: "llm" icon: "💎"

Dagster与LLM:数据感知的工作流编排

Dagster概述

Dagster是一个数据感知的工作流编排平台,通过资产(Asset)和操作(Op)的概念构建可靠的数据管道。特别适合构建数据驱动的LLM应用。

核心概念

1. 定义Op(操作)

from dagster import op, Out, In
from openai import OpenAI

@op(out=Out(str))
def fetch_article(article_id: str) -> str:
    """获取文章内容"""
    print(f"获取文章: {article_id}")
    return f"文章 {article_id} 的内容..."

@op(out=Out(str))
def summarize_article(context, article: str) -> str:
    """总结文章"""
    client = OpenAI()
    response = client.chat.completions.create(
        model="gpt-4",
        messages=[{"role": "user", "content": f"总结: {article}"}]
    )
    result = response.choices[0].message.content
    context.log.info(f"总结完成: {result[:50]}...")
    return result

@op(out=Out(dict))
def analyze_sentiment(context, text: str) -> dict:
    """情感分析"""
    client = OpenAI()
    response = client.chat.completions.create(
        model="gpt-4",
        messages=[{
            "role": "user",
            "content": f"分析以下文本的情感,返回JSON: {{\"sentiment\": \"positive/negative/neutral\", \"score\": 0-1}}\n{text}"
        }]
    )
    import json
    return json.loads(response.choices[0].message.content)

2. 定义Job(作业)

from dagster import job

@job
def article_processing_job():
    """文章处理作业"""
    article = fetch_article()
    summary = summarize_article(article)
    sentiment = analyze_sentiment(summary)

# 执行作业
result = article_processing_job.execute_in_process()

资产式开发

1. 定义Asset(资产)

from dagster import asset, AssetIn, Output
import json

@asset
def raw_articles():
    """原始文章资产"""
    return [
        {"id": "1", "content": "AI正在改变世界..."},
        {"id": "2", "content": "LLM技术快速发展..."}
    ]

@asset
def processed_articles(raw_articles):
    """处理后的文章"""
    processed = []
    client = OpenAI()
    
    for article in raw_articles:
        response = client.chat.completions.create(
            model="gpt-4",
            messages=[{
                "role": "user",
                "content": f"处理文章: {article['content']}"
            }]
        )
        processed.append({
            "id": article["id"],
            "processed_content": response.choices[0].message.content
        })
    
    return processed

@asset
def article_analytics(processed_articles):
    """文章分析"""
    return {
        "total": len(processed_articles),
        "avg_length": sum(len(a["processed_content"]) for a in processed_articles) / len(processed_articles)
    }

2. 资产传感器

from dagster import sensor, RunRequest

@sensor(job=article_processing_job)
def new_article_sensor(context):
    """新文章传感器"""
    # 检查是否有新文章
    last_check = context.last_run_key or "0"
    new_articles = check_new_articles(last_check)
    
    if new_articles:
        return RunRequest(
            run_key=str(len(new_articles)),
            run_config={"ops": {"fetch_article": {"config": {"article_id": new_articles[0]}}}}
        )

高级模式

1. 分支逻辑

from dagster import op, Out, BranchOpMapping

@op(out=Out(outs={"success": Out(dict), "failure": Out(dict))})
def quality_check(context, article: dict):
    """质量检查"""
    if len(article.get("content", "")) > 100:
        context.log.info("质量检查通过")
        return {"success": article}
    else:
        context.log.warning("质量检查失败")
        return {"failure": article}

@op
def process_valid(article: dict):
    """处理有效文章"""
    return f"处理: {article['content'][:50]}..."

@op
def handle_invalid(article: dict):
    """处理无效文章"""
    return f"无效文章: {article}"

@job
def branched_job():
    result = quality_check()
    process_valid(result["success"])
    handle_invalid(result["failure"])

2. 重试与错误处理

from dagster import op, RetryRequested

@op(
    retry_policy=RetryPolicy(max_retries=3, delay=timedelta(seconds=10)),
    config_schema={"max_retries": int}
)
def resilient_llm_call(context, prompt: str) -> str:
    """容错LLM调用"""
    try:
        client = OpenAI()
        response = client.chat.completions.create(
            model="gpt-4",
            messages=[{"role": "user", "content": prompt}]
        )
        return response.choices[0].message.content
    except Exception as e:
        context.log.error(f"LLM调用失败: {e}")
        raise RetryRequested()

部署与监控

from dagster import Definitions

# 定义所有资源
defs = Definitions(
    assets=[raw_articles, processed_articles, article_analytics],
    jobs=[article_processing_job, branched_job],
    sensors=[new_article_sensor],
)

# 使用dagster-webserver启动
# dagster webserver

最佳实践

  1. 类型安全:使用Dagster的类型系统确保数据正确性
  2. 资产优先:优先使用资产式开发,便于追踪数据血缘
  3. 测试友好:每个Op和Asset都可以独立测试
  4. 配置管理:使用配置系统管理环境差异

总结

Dagster为LLM工作流提供了数据感知的编排方案。通过资产和操作的抽象,可以构建类型安全、可测试、可追踪的AI数据管道。