Tutorial 101:¶

How to deploy a sequence task into the cluster using maestro lightning package.

In [1]:
import os, json
from maestro_lightning import Flow, Task, Dataset, Image

1) Task parameters:¶

In [2]:
basepath         = os.getcwd()
input_path       = f"{basepath}/jobs"
number_of_events = 2
number_of_jobs   = 2
run_number       = 20251211
image_path       = '/mnt/shared/storage03/projects/cern/data/images/lorenzetti_latest.sif'
repo_build_path  = '/home/joao.pinto/git_repos/lorenzetti/build'
binds            = {"/mnt/shared/storage03" : "/mnt/shared/storage03"}

2) Create the job dataset:¶

In [4]:
os.makedirs(input_path, exist_ok=True)
for job_id in range(number_of_jobs):
    with open(f"{input_path}/job_{job_id}.json", 'w') as f:
        nov = int(number_of_events / number_of_jobs)
        d = {
            'run_number'        : run_number,
            'seed'              : int(128 * (1+job_id)),
            'number_of_events'  : nov,
            #'events_per_job'    : int(nov/4),
            #'number_of_threads' : 4
        }
        json.dump(d,f)

3) Create the flow:¶

Let's create a flow of five sequence tasks from EVT to NTUPLE reconstruction.

In [5]:
with Flow(name="local_provider", path=f"{basepath}/local_tasks") as session:
    input_dataset    = Dataset(name="jobs", path=input_path)
    image            = Image(name="lorenzetti", path=image_path)


    pre_exec = f"source {repo_build_path}/lzt_setup.sh"

    command = f"{pre_exec} && gen_zee.py -o %OUT --job-file %IN -m"

    task_1 = Task(name="task_1.EVT",
                  image=image,
                  command=command,
                  input_data=input_dataset,
                  outputs={'OUT':'Zee.EVT.root'},
                  partition='cpu-large',
                  binds=binds)
    
    
    command = f"{pre_exec} && simu_trf.py -i %IN -o %OUT -nt 40"
    task_2 = Task(name="task_2.HIT",
                  image=image,
                  command=command,
                  input_data=task_1.output('OUT'),
                  outputs= {'OUT':'Zee.HIT.root'},
                  partition='cpu-large',
                  binds=binds)
    
    
    command = f"{pre_exec} && digit_trf.py -i %IN -o %OUT -nt 40"
    task_3 = Task(name="task_3.ESD",
                  image=image,
                  command=command,
                  input_data=task_2.output('OUT'),
                  outputs= {'OUT':'Zee.ESD.root'},
                  partition='cpu-large',
                  binds=binds)
    
    command = f"{pre_exec} && reco_trf.py -i %IN -o %OUT -nt 40"
    task_4 = Task(name="task_4.AOD",
                  image=image,
                  command=command,
                  input_data=task_3.output('OUT'),
                  outputs= {'OUT':'Zee.AOD.root'},
                  partition='cpu-large',
                  binds=binds)
    
    command = f"{pre_exec} && ntuple_trf.py -i %IN -o %OUT -nt 40"
    task_5 = Task(name="task_5.NTUPLE",
                  image=image,
                  command=command,
                  input_data=task_4.output('OUT'),
                  outputs= {'OUT':'Zee.NTUPLE.root'},
                  partition='cpu-large',
                  binds=binds)
   
   
    session.run()
15-Dec-2025 19:56:10 | Task task_1.EVT: creating output dataset 'Zee.EVT.root'.
15-Dec-2025 19:56:10 | Task task_2.HIT: looking for input dataset 'task_1.EVT.Zee.EVT.root'.
15-Dec-2025 19:56:10 | Task task_2.HIT: creating output dataset 'Zee.HIT.root'.
15-Dec-2025 19:56:10 | Task task_3.ESD: looking for input dataset 'task_2.HIT.Zee.HIT.root'.
15-Dec-2025 19:56:10 | Task task_3.ESD: creating output dataset 'Zee.ESD.root'.
15-Dec-2025 19:56:10 | Task task_4.AOD: looking for input dataset 'task_3.ESD.Zee.ESD.root'.
15-Dec-2025 19:56:10 | Task task_4.AOD: creating output dataset 'Zee.AOD.root'.
15-Dec-2025 19:56:10 | Task task_5.NTUPLE: looking for input dataset 'task_4.AOD.Zee.AOD.root'.
15-Dec-2025 19:56:10 | Task task_5.NTUPLE: creating output dataset 'Zee.NTUPLE.root'.
15-Dec-2025 19:56:10 | Running flow at /home/joao.pinto/git_repos/maestro-lightning/docs/local_tasks
15-Dec-2025 19:56:10 | No existing tasks found, initializing new flow.
15-Dec-2025 19:56:10 | Creating flow directory at /home/joao.pinto/git_repos/maestro-lightning/docs/local_tasks
15-Dec-2025 19:56:10 | Tasks saved to /home/joao.pinto/git_repos/maestro-lightning/docs/local_tasks/flow.json
15-Dec-2025 19:56:10 | Task task_1.EVT: preparing job 0 for input file job_0.json.
{'binds': {'/mnt/shared/storage03': '/mnt/shared/storage03'},
 'command': 'source /home/joao.pinto/git_repos/lorenzetti/build/lzt_setup.sh '
            '&& gen_zee.py -o %OUT --job-file %IN -m',
 'envs': {},
 'image': {'name': 'lorenzetti',
           'path': '/mnt/shared/storage03/projects/cern/data/images/lorenzetti_latest.sif'},
 'input_file': '/home/joao.pinto/git_repos/maestro-lightning/docs/jobs/job_0.json',
 'job_id': 0,
 'outputs': {'OUT': ('Zee.EVT.root',
                     {'from_task': 'task_1.EVT',
                      'name': 'task_1.EVT.Zee.EVT.root',
                      'path': '/home/joao.pinto/git_repos/maestro-lightning/docs/local_tasks/datasets/task_1.EVT.Zee.EVT.root'})},
 'secondary_data': {},
 'task_path': '/home/joao.pinto/git_repos/maestro-lightning/docs/local_tasks/tasks/task_1.EVT'}
15-Dec-2025 19:56:10 | Task task_1.EVT: preparing job 1 for input file job_1.json.
{'binds': {'/mnt/shared/storage03': '/mnt/shared/storage03'},
 'command': 'source /home/joao.pinto/git_repos/lorenzetti/build/lzt_setup.sh '
            '&& gen_zee.py -o %OUT --job-file %IN -m',
 'envs': {},
 'image': {'name': 'lorenzetti',
           'path': '/mnt/shared/storage03/projects/cern/data/images/lorenzetti_latest.sif'},
 'input_file': '/home/joao.pinto/git_repos/maestro-lightning/docs/jobs/job_1.json',
 'job_id': 1,
 'outputs': {'OUT': ('Zee.EVT.root',
                     {'from_task': 'task_1.EVT',
                      'name': 'task_1.EVT.Zee.EVT.root',
                      'path': '/home/joao.pinto/git_repos/maestro-lightning/docs/local_tasks/datasets/task_1.EVT.Zee.EVT.root'})},
 'secondary_data': {},
 'task_path': '/home/joao.pinto/git_repos/maestro-lightning/docs/local_tasks/tasks/task_1.EVT'}
15-Dec-2025 19:56:10 | Preparing task task_1.EVT for execution.
maestro run task -t /home/joao.pinto/git_repos/maestro-lightning/docs/local_tasks/flow.json -i 0
15-Dec-2025 19:56:12 | Initializing task with index 0.
15-Dec-2025 19:56:12 | Loading task file /home/joao.pinto/git_repos/maestro-lightning/docs/local_tasks/flow.json.
15-Dec-2025 19:56:12 | Task task_1.EVT: looking for input dataset 'jobs'.
15-Dec-2025 19:56:12 | Task task_1.EVT: looking for image 'lorenzetti'.
15-Dec-2025 19:56:12 | Task task_1.EVT: creating output dataset 'Zee.EVT.root'.
15-Dec-2025 19:56:12 | Task task_1.EVT: loading existing job from /home/joao.pinto/git_repos/maestro-lightning/docs/local_tasks/tasks/task_1.EVT/jobs/inputs/job_0.json.
15-Dec-2025 19:56:12 | Task task_1.EVT: loading existing job from /home/joao.pinto/git_repos/maestro-lightning/docs/local_tasks/tasks/task_1.EVT/jobs/inputs/job_1.json.
15-Dec-2025 19:56:12 | Task task_2.HIT: looking for input dataset 'task_1.EVT.Zee.EVT.root'.
15-Dec-2025 19:56:12 | Task task_2.HIT: looking for image 'lorenzetti'.
15-Dec-2025 19:56:12 | Task task_2.HIT: creating output dataset 'Zee.HIT.root'.
15-Dec-2025 19:56:12 | Task task_3.ESD: looking for input dataset 'task_2.HIT.Zee.HIT.root'.
15-Dec-2025 19:56:12 | Task task_3.ESD: looking for image 'lorenzetti'.
15-Dec-2025 19:56:12 | Task task_3.ESD: creating output dataset 'Zee.ESD.root'.
15-Dec-2025 19:56:12 | Task task_4.AOD: looking for input dataset 'task_3.ESD.Zee.ESD.root'.
15-Dec-2025 19:56:12 | Task task_4.AOD: looking for image 'lorenzetti'.
15-Dec-2025 19:56:12 | Task task_4.AOD: creating output dataset 'Zee.AOD.root'.
15-Dec-2025 19:56:12 | Task task_5.NTUPLE: looking for input dataset 'task_4.AOD.Zee.AOD.root'.
15-Dec-2025 19:56:12 | Task task_5.NTUPLE: looking for image 'lorenzetti'.
15-Dec-2025 19:56:12 | Task task_5.NTUPLE: creating output dataset 'Zee.NTUPLE.root'.
15-Dec-2025 19:56:12 | Fetched task task_1.EVT for initialization.
15-Dec-2025 19:56:12 | Submitting main script for task task_1.EVT.
15-Dec-2025 19:56:12 | Adding SLURM option: ARRAY with value: --array=0,1
15-Dec-2025 19:56:12 | Adding SLURM option: OUTPUT_FILE with value: --output=/home/joao.pinto/git_repos/maestro-lightning/docs/local_tasks/tasks/task_1.EVT/works/job_%a/output.out
15-Dec-2025 19:56:12 | Adding SLURM option: ERROR_FILE with value: --error=/home/joao.pinto/git_repos/maestro-lightning/docs/local_tasks/tasks/task_1.EVT/works/job_%a/output.err
15-Dec-2025 19:56:12 | Adding SLURM option: PARTITION with value: --partition=cpu-large
15-Dec-2025 19:56:12 | Adding SLURM option: JOB_NAME with value: --job-name=run-0
maestro run job -i /home/joao.pinto/git_repos/maestro-lightning/docs/local_tasks/tasks/task_1.EVT/jobs/inputs/job_$SLURM_ARRAY_TASK_ID.json -o /home/joao.pinto/git_repos/maestro-lightning/docs/local_tasks/tasks/task_1.EVT/works/job_$SLURM_ARRAY_TASK_ID
15-Dec-2025 19:56:12 | Submitting job...
#!/bin/bash
#SBATCH --array=0,1
#SBATCH --output=/home/joao.pinto/git_repos/maestro-lightning/docs/local_tasks/tasks/task_1.EVT/works/job_%a/output.out
#SBATCH --error=/home/joao.pinto/git_repos/maestro-lightning/docs/local_tasks/tasks/task_1.EVT/works/job_%a/output.err
#SBATCH --partition=cpu-large
#SBATCH --job-name=run-0
source /home/joao.pinto/git_repos/maestro-lightning/maestro_lightning-env/bin/activate
source /home/joao.pinto/git_repos/maestro-lightning/maestro_lightning-env/bin/activate
maestro run job -i /home/joao.pinto/git_repos/maestro-lightning/docs/local_tasks/tasks/task_1.EVT/jobs/inputs/job_$SLURM_ARRAY_TASK_ID.json -o /home/joao.pinto/git_repos/maestro-lightning/docs/local_tasks/tasks/task_1.EVT/works/job_$SLURM_ARRAY_TASK_ID
15-Dec-2025 19:56:12 | Job submitted successfully with Job ID: 1348
15-Dec-2025 19:56:12 | Submitted task task_1.EVT with job ID 1348.
15-Dec-2025 19:56:12 | Creating closing script for task task_1.EVT.
15-Dec-2025 19:56:12 | Adding SLURM option: OUTPUT_FILE with value: --output=/home/joao.pinto/git_repos/maestro-lightning/docs/local_tasks/tasks/task_1.EVT/logs/task_end_0.out
15-Dec-2025 19:56:12 | Adding SLURM option: ERROR_FILE with value: --error=/home/joao.pinto/git_repos/maestro-lightning/docs/local_tasks/tasks/task_1.EVT/logs/task_end_0.err
15-Dec-2025 19:56:12 | Adding SLURM option: JOB_NAME with value: --job-name=next-0
15-Dec-2025 19:56:12 | Adding SLURM option: PARTITION with value: --partition=cpu-large
15-Dec-2025 19:56:12 | Adding SLURM option: DEPENDENCY with value: --dependency=afterok:1348
15-Dec-2025 19:56:12 | Submitting closing script for task task_1.EVT.
maestro run next -t /home/joao.pinto/git_repos/maestro-lightning/docs/local_tasks/flow.json -i 0
15-Dec-2025 19:56:12 | Submitting job...
#!/bin/bash
#SBATCH --output=/home/joao.pinto/git_repos/maestro-lightning/docs/local_tasks/tasks/task_1.EVT/logs/task_end_0.out
#SBATCH --error=/home/joao.pinto/git_repos/maestro-lightning/docs/local_tasks/tasks/task_1.EVT/logs/task_end_0.err
#SBATCH --job-name=next-0
#SBATCH --partition=cpu-large
#SBATCH --dependency=afterok:1348
source /home/joao.pinto/git_repos/maestro-lightning/maestro_lightning-env/bin/activate
maestro run next -t /home/joao.pinto/git_repos/maestro-lightning/docs/local_tasks/flow.json -i 0
15-Dec-2025 19:56:12 | Job submitted successfully with Job ID: 1349
🚨 Please do not remove or move the flow directory or any dataset paths!
🚨 Any changes may break the program and lead to unexpected behavior.
15-Dec-2025 19:56:12 | Current images in the flow:
+------------+-----------------------------------------------------------------------+
| image      | path                                                                  |
|------------+-----------------------------------------------------------------------|
| lorenzetti | /mnt/shared/storage03/projects/cern/data/images/lorenzetti_latest.sif |
+------------+-----------------------------------------------------------------------+
15-Dec-2025 19:56:12 | Current datasets in the flow:
+-------------------------------+-------------+
| dataset                       |   num_files |
|-------------------------------+-------------|
| jobs                          |           2 |
| task_1.EVT.Zee.EVT.root       |           0 |
| task_2.HIT.Zee.HIT.root       |           0 |
| task_3.ESD.Zee.ESD.root       |           0 |
| task_4.AOD.Zee.AOD.root       |           0 |
| task_5.NTUPLE.Zee.NTUPLE.root |           0 |
+-------------------------------+-------------+
15-Dec-2025 19:56:12 | Current tasks in the flow:
+---------------+------------+-----------+-----------+-------------+----------+-------------+----------+
| taskname      |   assigned |   pending |   running |   completed |   failed |   finalized | status   |
|---------------+------------+-----------+-----------+-------------+----------+-------------+----------|
| task_1.EVT    |          2 |         0 |         0 |           0 |        0 |           0 | running  |
| task_2.HIT    |          0 |         0 |         0 |           0 |        0 |           0 | assigned |
| task_3.ESD    |          0 |         0 |         0 |           0 |        0 |           0 | assigned |
| task_4.AOD    |          0 |         0 |         0 |           0 |        0 |           0 | assigned |
| task_5.NTUPLE |          0 |         0 |         0 |           0 |        0 |           0 | assigned |
+---------------+------------+-----------+-----------+-------------+----------+-------------+----------+

NOTE: Dont remove or modify any content inside of the local_tasks folder.

4) How to print the list of tasks?¶

In [6]:
!maestro task list -h
Usage: maestro list [-h] -i INPUT_FILE [--message-level MESSAGE_LEVEL]

Optional Arguments:
  -h, --help            show this help message and exit
  -i, --input INPUT_FILE
                        The job input file
  --message-level MESSAGE_LEVEL
                        Set the logging level (DEBUG, INFO, WARNING, ERROR,
                        CRITICAL). Default is INFO.
In [29]:
!maestro task list -i local_tasks
+---------------+------------+-----------+-----------+-------------+----------+-------------+-----------+
| taskname      |   assigned |   pending |   running |   completed |   failed |   finalized | status    |
|---------------+------------+-----------+-----------+-------------+----------+-------------+-----------|
| task_1.EVT    |          0 |         0 |         0 |           2 |        0 |           0 | completed |
| task_2.HIT    |          0 |         0 |         0 |           2 |        0 |           0 | completed |
| task_3.ESD    |          0 |         0 |         0 |           2 |        0 |           0 | completed |
| task_4.AOD    |          0 |         0 |         0 |           2 |        0 |           0 | completed |
| task_5.NTUPLE |          0 |         0 |         0 |           2 |        0 |           0 | completed |
+---------------+------------+-----------+-----------+-------------+----------+-------------+-----------+

5) How to retry?¶

In case of panic, use the retry method to recover all failed jobs.

In [8]:
!maestro task retry -h
Usage: maestro retry [-h] -i INPUT_FILE [--message-level MESSAGE_LEVEL]
                     [--dry-run]

Optional Arguments:
  -h, --help            show this help message and exit
  -i, --input INPUT_FILE
                        The job input file
  --message-level MESSAGE_LEVEL
                        Set the logging level (DEBUG, INFO, WARNING, ERROR,
                        CRITICAL). Default is INFO.
  --dry-run             Perform a dry run without executing the task.
In [ ]: