Skip to content

Starting a job

A job is an instance of a ProcessingStep and its parametrization. Once configuration is completed, the job can be scheduled for execution; when the job is completed, the results are also attached to the Job. A job is therefore a logical context for a computation. While the portal can be used to start single jobs, the Python API allows you to start large batches of jobs programmatically.

Using your httpx Client (configured in the previous section), begin by creating a Job object, and then calling create(name). The returned object is the newly created Job, including methods to configure it, start it, and so on. This Job object is actually a wrapper around the server-sent object, which can be reached by job.job_hco, and contains all information sent to the client.

from pinexq.client.job_management import Job
# httpx client is already configured with host and credentials
job = Job(client).create(name="MyJob")

To run the Job, we first need to select the desired ProcessingStep. This task is simplest if you have either:

  1. The name and version of your desired ProcessingStep, or
  2. The URL of the ProcessingStep. If you do not specify the version number, and multiple versions exist, the function will throw an exception and suggest alternatives. Alternatively, you can query the system programmatically; see here for more.
# job from previous snippet
# by function name and version
job.select_processing(function_name="my_function", version="1.0.0")
# by instance obtained either by a ProcessingStep query or by creating a ProcessingStep object
job.select_processing(processing_step_instance=ps_instance)
# by link
job.select_processing(processing_step_link=ps_url)

While not strictly necessary, tagging jobs allows you to organize your jobs more easily and find them in queries.

job.set_tags(["test", "my_function"])

ProcessingSteps can have parameters. If so, any non-default parameters must be configured. Note that input data files are handled differently; for more information see Working with WorkData

job_parameters = {"x": 5}
job.configure_parameters(**job_parameters) # or just job.configure_parameters(x=5)

Once configuration is complete, the job enters the state: JobStates.ready_for_processing and can be started by calling start.

# start the job
job.start()

After starting the Job, it will be scheduled for execution, and will change its state to indicate progress.

  • JobStates.pending: Job is waiting for a Worker
  • JobStates.processing: Job is currently executed by a Worker
  • JobStates.completed: Job execution is done

If the execution leads to an error on the Worker, the state changes to JobStates.error, and the job.error_description field contains some additional information.

To know if a job is completed, we need to access the current state. This is done through events sent by the server.

job.wait_for_completion(timeout_s=60.0)

When a job is completed, the result is separated:

  • The ProcessingStep return value of the function is stored in job.result as a string
  • Any data written is stored in the associated OutputDataSlots
# wait for completion first!
result = job.get_result()

Side note: while the job is processing, the data written to OutputDataSlots can be accessed, but may be incomplete.

To guarantee data lineage, a job cannot be deleted if some produced data depends on it. This is because the job contains the information on how the data was produced.

A Job can be deleted if:

  1. It is not in a running state.
  2. It does not have any output WorkData. Delete the WorkData if necessary.
  3. It does not have any sub job. Delete the sub job if necessary.
  • The deletion of job and any related data is permanent. Only the events log is retained for future reference.
  • Once the job is deleted, existence of any entity referred in the event log cannot be guaranteed.
  • WorkData can be deleted if no job is using them as input.
job.delete()

For convenience, deleting a job and its associated outputs and sub jobs can be done in a single step:

job.delete_with_associated(
delete_output_workdata=True,
delete_input_workdata=False,
delete_subjobs_with_data=True)

All Job functions are written in fluent style, so they can be chained:

from pinexq.client.job_management import Job
job = Job(client).create("my_job")
.select_processing(function_name="my_function", function_version="1.0.0")
.set_tags(["test", "my_function"])
.configure_parameters(x=5)
.start()
.wait_for_completion(timeout_s=60.0)
result = job.get_result()
job.delete()

Alternatively, configuring and starting your Job can be done in a single step, saving multiple HTTP calls. Note that with this format, it is necessary to either specify a ProcessingStep by URL, or create an instance using the name and version first. Use this if you need to create many jobs fast or if you don’t need the support provided by smaller configuration steps.

from pinexq.client.job_management import Job, ProcessingStep
processing_step = ProcessingStep.from_name(
client=client,
step_name="my_function",
version="1.0.0")
job = Job(client=client).create_and_configure_rapidly(
name="my_job",
processing_step_instance=processing_step, # or processing_step_url="..."
tags=["test", "my_function"],
start=True
)