Skip to content

Ihre erste Pipeline

Erstellen Sie Ihre erste Datenverarbeitungspipeline mit dem Swiss AI Hub Pipeline (swiss_ai_hub.pipeline) SDK – eine vollständige Datentransformationspipeline mit mehreren verbundenen Assets.

Was Sie lernen werden

Dieser Quickstart behandelt die wesentlichen Bausteine:

  • Asset-Struktur: Wie Pipelines Daten durch verbundene Assets verarbeiten
  • Datenfluss: Wie Daten automatisch zwischen Assets fließen
  • Konfiguration: Einstellungen und Ressourcen, die das Verhalten der Pipeline steuern
  • Testen: Ausführen Ihrer Pipeline lokal und in der Dagster UI
  • Observability: Überwachung der Pipeline-Ausführung mit integrierten Tools

Voraussetzungen

Sie benötigen eine laufende Swiss AI Hub Entwicklungsumgebung. Bevor Sie beginnen, stellen Sie sicher, dass Sie die Schritte zur Einrichtung der Entwicklungsumgebung abgeschlossen haben.

Wie Pipelines funktionieren

Swiss AI Hub Pipelines sind Datenverarbeitungs-Workflows, die auf Dagster basieren und aus drei wesentlichen Teilen bestehen:

  • Assets: Funktionen, die Daten erstellen, transformieren oder konsumieren
  • Abhängigkeiten: Automatischer Datenfluss zwischen Assets basierend auf Funktionsparametern
  • Ressourcen: Gemeinsam genutzte Konfiguration und Services für externe Systeme

Erstellen Sie Ihre erste Pipeline

Lassen Sie uns eine Datenpipeline erstellen, die Benutzer-Feedback-Daten verarbeitet.

Beginnen Sie mit einer einfachen Pipeline

Zuerst wollen wir die Grundlagen der Pipeline anhand eines minimalen Beispiels verstehen:

1. Erstellen Sie Ihre grundlegenden Assets (simple_pipeline.py):

python
from dagster import AssetExecutionContext, Output, asset


@asset(description="Raw text data source")
def raw_feedback_data(context: AssetExecutionContext) -> Output[str]:
    """Source asset that provides raw user feedback data."""
    feedback = "The product is amazing but the documentation could be better!"
    context.log.info(f"Loaded raw feedback: {feedback}")
    return Output(feedback, metadata={"feedback": feedback})


@asset(description="Cleaned and processed feedback")
def cleaned_feedback(context: AssetExecutionContext, raw_feedback_data: str) -> Output[dict]:
    """Transform raw feedback into structured data."""
    # Simple processing: clean text and extract basic metrics
    text = raw_feedback_data.strip().lower()
    words = text.split()

    processed = {
        "original_text": raw_feedback_data,
        "cleaned_text": text,
        "word_count": len(words),
        "sentiment": "positive" if "amazing" in text else "neutral",
    }

    context.log.info(f"Processed feedback: {processed}")
    return Output(
        processed,
        metadata={
            "original_text": processed["original_text"],
            "cleaned_text": processed["cleaned_text"],
            "word_count": processed["word_count"],
            "sentiment": processed["sentiment"],
        },
    )

2. Fügen Sie die Pipeline-Definition hinzu (simple_pipeline.py):

python
from dagster import Definitions

## ... your asset definitions from above ...

# Basic pipeline definition
defs = Definitions(assets=[raw_feedback_data, cleaned_feedback])

3. Führen Sie Ihre grundlegende Pipeline aus:

bash
uv run dagster dev -f simple_pipeline.py

Öffnen Sie http://localhost:3000 und Sie werden sehen:

  • Asset-Abstammungsgraph: raw_feedback_data → cleaned_feedback
  • Materialisierungs-Buttons zur Ausführung von Assets
  • Asset-Details, die Inputs, Outputs und Ausführungs-Logs zeigen

Klicken Sie auf „Materialize all“, um die Pipeline auszuführen und den Datenfluss zu sehen!

Bauen Sie eine reale Swiss AI Hub Pipeline

Nun erstellen wir eine realistische Pipeline mit dem swiss_ai_hub.pipeline SDK, die Dokumentenverarbeitungsmuster demonstriert. Wir werden dies Schritt für Schritt aufschlüsseln, um jede Komponente zu verstehen.

1. Die Struktur der Swiss AI Hub Pipeline verstehen

Swiss AI Hub Pipelines folgen diesen Schlüsselmustern:

  • Asset-Fabriken: Wiederverwendbare Funktionen, die konfigurierte Assets erstellen
  • Ressourcen: Konfigurierte Services wie Parser, Stores und Embedding-Modelle
  • Dynamische Partitionen: Jedes Dokument wird zu einer separaten Partition für die parallele Verarbeitung

2. Richten Sie Ihre Pipeline-Konfiguration ein

Beginnen Sie mit der Erstellung der Basiskonfiguration und Importe (my_document_pipeline.py):

python
from swiss_ai_hub.core.generative_ai.resources.models.llm.embedding_model_config import EmbeddingModelConfig
from dagster import AssetKey, AssetSelection, Definitions, DynamicPartitionsDefinition

# Import Swiss AI Hub pipeline factories
from swiss_ai_hub.pipeline.assets.factories.data_lake_to_vector_store.documents_factory import documents_factory
from swiss_ai_hub.pipeline.assets.factories.data_lake_to_vector_store.nodes_factory import nodes_factory
from swiss_ai_hub.pipeline.assets.factories.data_lake_to_vector_store.observable_data_lake_factory import (
    observable_data_lake_factory,
)

# Import Swiss AI Hub resources and utilities
from swiss_ai_hub.pipeline.resources.factory import (
    default_io_manager_s3_datalake_resources,
    local_mongo_milvus_storage_context_resource,
    s3_data_lake_resources,
)
from swiss_ai_hub.pipeline.resources.llm.embedding_model_resource import EmbeddingModelResource
from swiss_ai_hub.pipeline.resources.parser.document_parser_resource import DocumentParserResource, LoaderType
from swiss_ai_hub.pipeline.resources.parser.markdown_structural_node_parser_resource import MarkdownStructuralNodeParserResource

# Pipeline configuration - defines where data flows between assets
DATA_LAKE_KEY = AssetKey(["playground", "data_lake"])      # Raw file storage 
DOCUMENT_KEY = AssetKey(["playground", "documents"])       # Parsed documents
NODES_KEY = AssetKey(["playground", "nodes"])              # Document chunks with embeddings

# Storage configuration
CONTAINER_NAME = "playground"    # S3 bucket/container name
DIRECTORY_NAME = "documents"     # Folder for documents
NAMESPACE_NAME = DIRECTORY_NAME  # Vector store namespace
STORE_NAME = CONTAINER_NAME      # Document store name

# Dynamic partitions allow parallel processing of individual documents
document_partitions = DynamicPartitionsDefinition(name="document_partitions")

3. Erstellen Sie Ihre Pipeline-Assets

Als Nächstes erstellen Sie die drei Haupt-Assets, die Ihre Verarbeitungspipeline bilden:

python
# Create the pipeline assets using Swiss AI Hub factories

# 1. Observable Data Lake - watches for new/changed files
observable_asset = observable_data_lake_factory(
    asset_key=DATA_LAKE_KEY, 
    partitions=document_partitions
)

# 2. Documents Asset - processes raw files into structured documents
documents_asset = documents_factory(
    asset_key=DOCUMENT_KEY,
    data_lake_key=DATA_LAKE_KEY,    # Depends on data lake 
    partitions=document_partitions   # One partition per document
)

# 3. Nodes Asset - chunks documents and creates embeddings
nodes_asset = nodes_factory(
    asset_key=NODES_KEY,
    document_key=DOCUMENT_KEY,       # Depends on documents
    partitions=document_partitions   # One partition per document
)

# Combine all assets
assets = [observable_asset, documents_asset, nodes_asset]

Die Asset-Fabriken verstehen:

  • observable_data_lake_factory: Erstellt ein Asset, das Dateiänderungen überwacht
  • documents_factory: Erstellt ein Asset, das Dateien in RefDoc-Objekte mit Metadaten parst
  • nodes_factory: Erstellt ein Asset, das Dokumente in Chunks zerlegt und Vektor-Embeddings generiert

4. Konfigurieren Sie Ihre Pipeline-Ressourcen

Konfigurieren Sie nun die Ressourcen (Services), die Ihre Pipeline benötigt:

python
# Resource configuration - split into logical groups for clarity

# A. Storage and I/O Resources
storage_resources = {
    # Data lake I/O managers for S3-compatible storage
    **default_io_manager_s3_datalake_resources(
        container_name=CONTAINER_NAME, 
        directory_name=DIRECTORY_NAME
    ),
    
    # Data lake resources for file management
    **s3_data_lake_resources(
        container_name=CONTAINER_NAME,
        directory_name=DIRECTORY_NAME,
        figures_directory_name="__figures__",  # For extracted images/figures
    ),
}

# B. Document Processing Resources  
processing_resources = {
    # Document parser - uses AI-powered MinerU for PDF/Word/etc
    "document_parser": DocumentParserResource(loader_type=LoaderType.MINERU),

    # Node parser - chunks documents using structural elements
    "node_parser": MarkdownStructuralNodeParserResource(),
}

# C. Database and Search Resources
database_resources = {
    # Vector store and document store (MongoDB + Milvus)
    **local_mongo_milvus_storage_context_resource(
        vector_store_uri="http://localhost:19530",  # Milvus connection
        store_name=STORE_NAME,
        namespace_name=NAMESPACE_NAME,
    ),
}

# D. AI Model Resources
ai_resources = {
    # Embedding model for creating vector representations
    "embedding_model": EmbeddingModelResource(
        embedding_config=EmbeddingModelConfig(
            model_name="azure/text-embedding-3-large"
        ),
    ),
}

# Combine all resources
all_resources = {
    **storage_resources,
    **processing_resources, 
    **database_resources,
    **ai_resources,
}

5. Definieren Sie Ihre vollständige Pipeline

Führen Sie abschließend alles in der Pipeline-Definition zusammen:

python
# Define the complete pipeline
defs = Definitions(
    assets=assets,           # The three processing assets
    resources=all_resources, # All configured services
)

Vollständige Pipeline-Datei (my_document_pipeline.py):

Hier ist die vollständige Datei mit allen Komponenten zusammen:

python
from swiss_ai_hub.core.generative_ai.resources.models.llm.embedding_model_config import EmbeddingModelConfig
from dagster import AssetKey, Definitions, DynamicPartitionsDefinition

from swiss_ai_hub.pipeline.assets.factories.data_lake_to_vector_store.documents_factory import documents_factory
from swiss_ai_hub.pipeline.assets.factories.data_lake_to_vector_store.nodes_factory import nodes_factory
from swiss_ai_hub.pipeline.assets.factories.data_lake_to_vector_store.observable_data_lake_factory import (
    observable_data_lake_factory,
)

from swiss_ai_hub.pipeline.resources.factory import (
    default_io_manager_s3_datalake_resources,
    local_mongo_milvus_storage_context_resource,
    s3_data_lake_resources,
)
from swiss_ai_hub.pipeline.resources.llm.embedding_model_resource import EmbeddingModelResource
from swiss_ai_hub.pipeline.resources.parser.document_parser_resource import DocumentParserResource, LoaderType
from swiss_ai_hub.pipeline.resources.parser.markdown_structural_node_parser_resource import MarkdownStructuralNodeParserResource

# Configuration
DATA_LAKE_KEY = AssetKey(["playground", "data_lake"])
DOCUMENT_KEY = AssetKey(["playground", "documents"])  
NODES_KEY = AssetKey(["playground", "nodes"])

CONTAINER_NAME = "playground"
DIRECTORY_NAME = "documents" 
NAMESPACE_NAME = DIRECTORY_NAME
STORE_NAME = CONTAINER_NAME

document_partitions = DynamicPartitionsDefinition(name="document_partitions")

# Assets
observable_asset = observable_data_lake_factory(DATA_LAKE_KEY, document_partitions)
documents_asset = documents_factory(DOCUMENT_KEY, data_lake_key=DATA_LAKE_KEY, partitions=document_partitions)
nodes_asset = nodes_factory(NODES_KEY, document_key=DOCUMENT_KEY, partitions=document_partitions)

assets = [observable_asset, documents_asset, nodes_asset]

# Resources
defs = Definitions(
    assets=assets,
    resources={
        **default_io_manager_s3_datalake_resources(CONTAINER_NAME, DIRECTORY_NAME),
        **s3_data_lake_resources(CONTAINER_NAME, DIRECTORY_NAME, "__figures__"),
        **local_mongo_milvus_storage_context_resource("http://localhost:19530", STORE_NAME, NAMESPACE_NAME),
        "document_parser": DocumentParserResource(loader_type=LoaderType.MINERU),
        "node_parser": MarkdownStructuralNodeParserResource(),
        "embedding_model": EmbeddingModelResource(
            embedding_config=EmbeddingModelConfig(model_name="azure/text-embedding-3-large")
        ),
    },
)

6. Führen Sie Ihre Swiss AI Hub Pipeline aus:

bash
uv run dagster dev -f my_document_pipeline.py

Sie werden die vollständige Dokumentenverarbeitungspipeline sehen:

data_lake (observable) → documents → nodes
                            ↓          ↓
                       (DocStore)  (VectorStore)

7. Den Datenfluss verstehen:

  1. Observable Data Lake: Überwacht neue PDF-, Word-, Markdown- usw. Dateien
  2. Documents: Parst Dateien mithilfe von KI-gestützter Dokumentenintelligenz (MinerU)
  3. Nodes: Zerlegt Dokumente mittels struktureller Analyse in Chunks und generiert Embeddings

8. Fügen Sie Jobs und Planung zu Ihrer Pipeline hinzu

Für Produktions-Pipelines möchten Sie Jobs und Planung hinzufügen. Erweitern wir die Pipeline:

python
# Add these imports to my_document_pipeline.py
from swiss_ai_hub.pipeline.jobs.factory import observe_source_job
from swiss_ai_hub.pipeline.schedules.factory import daily_schedule_at
from swiss_ai_hub.pipeline.sensors.factory import default_automation_sensor

# Create jobs for different operations
observe_job = observe_source_job(
    observable_asset=observable_asset,
    namespace_name=NAMESPACE_NAME,
)

# Update your pipeline definition to include jobs and schedules
defs = Definitions(
    assets=assets,
    resources={
        # ... your existing resources ...
    },
    
    # Add jobs for pipeline operations
    jobs=[observe_job],
    
    # Add scheduling - observe daily at midnight
    schedules=[daily_schedule_at(observe_job, hour=0, minute=0)],
    
    # Add sensors for automation
    sensors=[default_automation_sensor(assets)],
)

Jobs und Planung verstehen:

  • observe_job: Manuelle Auslösung der Überwachung des Data Lake
  • daily_schedule_at: Planung der automatischen Data Lake-Überwachung
  • default_automation_sensor: Automatische Auslösung der Asset-Verarbeitung, wenn sich Abhängigkeiten ändern

Ihre Pipeline unterstützt nun:

  • Manuelle Ausführung: Materialisieren Sie einzelne Assets in der Dagster UI
  • Geplante Überwachung: Tägliche Überprüfung auf neue Dokumente
  • Automatische Verarbeitung: Assets werden automatisch verarbeitet, wenn Upstream-Änderungen erkannt werden

9. Überwachen mit Swiss AI Hub Observability-Tools:

  • Dagster UI (http://localhost:3000): Asset-Abstammung, Ausführungs-Logs und Materialisierungs-Historie
  • MongoDB Compass: Inspektion des Dokumenten-Stores
  • Milvus (Attu): Überwachung der Vektor-Datenbank

SeaweedFS Filer

In Produktion ist die SeaweedFS Filer Web-UI unter datalake.${DOMAIN} zugänglich (OAuth2 geschützt, erfordert die Rolle AIHubDeveloper). Im Entwicklungsmodus ist sie unter http://localhost:8889 verfügbar, um hochgeladene Dateien zu durchsuchen und die Speicherung zu debuggen.

10. Die Swiss AI Hub Pipeline-Muster verstehen

Ihre Swiss AI Hub Pipeline demonstriert Schlüsselmuster:

  1. Observable Assets: Erkennen neue Dokumente automatisch ohne manuelles Eingreifen
  2. Dynamische Partitionen: Jedes Dokument wird unabhängig verarbeitet
  3. Ressourcenmanagement: Konfigurierbare Parser, Modelle und Speicher-Backends
  4. Automatisierungsrichtlinien: Eifrige Verarbeitung, wenn sich Upstream-Assets ändern

Was Sie gelernt haben

  • Swiss AI Hub SDK-Nutzung: Verwendung von Fabriken, Ressourcen und typisierten Datenobjekten aus swiss_ai_hub.pipeline
  • Dokumentenverarbeitungspipeline: Vollständiger Fluss von Rohdateien zu durchsuchbaren Embeddings
  • Asset-Fabrik-Nutzung: Verwendung bestehender Fabriken wie documents_factory und nodes_factory
  • Ressourcenkonfiguration: Einrichtung von Parsern, LLMs und Speichersystemen
  • Observability: Überwachung KI-gestützter Pipelines mit Dagster
  • Produktionsreife: Skalierbare, automatisierte und wartbare Pipeline-Architektur

Nächste Schritte

  • Pipelines bauen – Lernen Sie fortgeschrittene Swiss AI Hub Pipeline-Muster kennen

Gebaut mit ❤️ in der Schweiz 🇨🇭