lamindb.flow

lamindb.flow(uid=None, global_run='clear')

Use @flow() to track a function as a workflow.

You will be able to see inputs, outputs, and parameters of the function in the data lineage graph.

The decorator creates a Transform object that maps onto the file in which the function is defined. The function maps onto an entrypoint of the transform. A function execution creates a Run object that stores the function name in run.entrypoint.

By default, like ln.track(), creates a global run context that can be accessed with ln.context.run.

Parameters:
  • uid (str | None, default: None) – Persist the uid to identify a transform across renames.

  • global_run (Literal['memorize', 'clear', 'none'], default: 'clear') – If "clear", set the global run context ln.context.run and clear after the function completes. If "memorize", set the global run context and do not clear after the function completes. Set this to "none" if you want to track concurrent executions of a flow() in the same Python process.

Return type:

Callable[[Callable[[ParamSpec(P, bound= None)], TypeVar(R)]], Callable[[ParamSpec(P, bound= None)], TypeVar(R)]]

Examples

To sync a workflow with a file in a git repo, see: Sync code with git.

For an extensive guide, see: Manage workflows. Here follow some examples.

my_workflow.py
import lamindb as ln


@ln.flow()
def ingest_dataset(key: str) -> ln.Artifact:
    df = ln.examples.datasets.mini_immuno.get_dataset1()
    artifact = ln.Artifact.from_dataframe(df, key=key).save()
    return artifact


if __name__ == "__main__":
    ingest_dataset(key="my_analysis/dataset.parquet")
my_workflow_with_step.py
import lamindb as ln


@ln.step()
def subset_dataframe(
    artifact: ln.Artifact,
    subset_rows: int = 2,
    subset_cols: int = 2,
) -> ln.Artifact:
    df = artifact.load()
    new_data = df.iloc[:subset_rows, :subset_cols]
    new_key = artifact.key.replace(".parquet", "_subsetted.parquet")
    return ln.Artifact.from_dataframe(new_data, key=new_key).save()


@ln.flow()
def ingest_dataset(key: str, subset: bool = False) -> ln.Artifact:
    df = ln.examples.datasets.mini_immuno.get_dataset1()
    artifact = ln.Artifact.from_dataframe(df, key=key).save()
    if subset:
        artifact = subset_dataframe(artifact)
    return artifact


if __name__ == "__main__":
    ingest_dataset(key="my_analysis/dataset.parquet", subset=True)
my_workflow_with_click.py
import click
import lamindb as ln


@click.command()
@click.option("--key", required=True)
@ln.flow()
def main(key: str):
    df = ln.examples.datasets.mini_immuno.get_dataset2()
    ln.Artifact.from_dataframe(df, key=key).save()


if __name__ == "__main__":
    main()