lamindb.flow¶
- lamindb.flow(uid=None, global_run='clear')¶
Use
@flow()to track a function as a workflow.You will be able to see inputs, outputs, and parameters of the function in the data lineage graph.
The decorator creates a
Transformobject that maps onto the file in which the function is defined. The function maps onto an entrypoint of thetransform. A function execution creates aRunobject that stores the function name inrun.entrypoint.By default, like
ln.track(), creates a global run context that can be accessed withln.context.run.- Parameters:
uid (
str|None, default:None) – Persist the uid to identify a transform across renames.global_run (
Literal['memorize','clear','none'], default:'clear') – If"clear", set the global run contextln.context.runand clear after the function completes. If"memorize", set the global run context and do not clear after the function completes. Set this to"none"if you want to track concurrent executions of aflow()in the same Python process.
- Return type:
Callable[[Callable[[ParamSpec(P, bound=None)],TypeVar(R)]],Callable[[ParamSpec(P, bound=None)],TypeVar(R)]]
Examples
To sync a workflow with a file in a git repo, see: Sync code with git.
For an extensive guide, see: Manage workflows. Here follow some examples.
my_workflow.py¶import lamindb as ln @ln.flow() def ingest_dataset(key: str) -> ln.Artifact: df = ln.examples.datasets.mini_immuno.get_dataset1() artifact = ln.Artifact.from_dataframe(df, key=key).save() return artifact if __name__ == "__main__": ingest_dataset(key="my_analysis/dataset.parquet")
my_workflow_with_step.py¶import lamindb as ln @ln.step() def subset_dataframe( artifact: ln.Artifact, subset_rows: int = 2, subset_cols: int = 2, ) -> ln.Artifact: df = artifact.load() new_data = df.iloc[:subset_rows, :subset_cols] new_key = artifact.key.replace(".parquet", "_subsetted.parquet") return ln.Artifact.from_dataframe(new_data, key=new_key).save() @ln.flow() def ingest_dataset(key: str, subset: bool = False) -> ln.Artifact: df = ln.examples.datasets.mini_immuno.get_dataset1() artifact = ln.Artifact.from_dataframe(df, key=key).save() if subset: artifact = subset_dataframe(artifact) return artifact if __name__ == "__main__": ingest_dataset(key="my_analysis/dataset.parquet", subset=True)
my_workflow_with_click.py¶import click import lamindb as ln @click.command() @click.option("--key", required=True) @ln.flow() def main(key: str): df = ln.examples.datasets.mini_immuno.get_dataset2() ln.Artifact.from_dataframe(df, key=key).save() if __name__ == "__main__": main()