DataSlots
DataSlots are how PineXQ handles large inputs and outputs. Each DataSlot represents one or more WorkData (think files) passed to ProCon. PineXQ takes care that e.g. for workflows the files are properly passed to other workers.
DataSlot Types
Section titled “DataSlot Types”@dataslot.input: Creates input data slot. The data from the slot will be cast to the annotated type of the corresponding parameter, if it’s not explicitly of typeDataSlot. If your input requires custom JSON parsing, the JSON reader can be passed as an argument; refer to Arguments. If raw access is required, the type can beDataSlot, which gives the function a reference to the input; see Inline DataSlots.@dataslot.output: Creates an output data slot, which can be passed on to other ProcessingSteps. If a type is specified, ProCon will create an instance of that type using an empty constructor, which must exist and the model must be valid in this state. The instance will be passed to the function and an internal reference is kept so ProCon can write the content after completion. To not invalidate the internal handle to the given instance, do not replace it (reassign it) in the function, but only update the content of the instance. If raw access is required, or an empty constructor does not exist, the type can beDataSlot, which gives the function a reference to the output; see Inline DataSlots. A custom writer can be passed as an argument, even if a type is not explicitly supplied; refer to Arguments.@dataslot.input_collection: Same as an input data slot, but allows multiple slots (files, URIs) as source. The order of input is preserved.@dataslot.output_collection: Same as an output data slot, but allows multiple slots (files, URIs) as destination. The order of input is preserved.@dataslot.returns: Indicates the return value of the function should be written to this DataSlot.
DataSlot Arguments
Section titled “DataSlot Arguments”The data slot decorators are further parametrized by the following parameters:
name:str: The name of the functions parameter this data slot is bound to. This parameter is omitted for thedataslot.returnsvariant.alias:str: The name for this DataSlot presented in the manifest, basically renaming it to the outside.title:str: Title for the DataSlot used in the manifest.description:str: Description for the DataSlot used in the manifest, e.g. output typing. This information will replace the parameter description in the docstring.reader:Callable[[IO], Any]: A callable accepting a file object as parameter and returning the datatype expected by the functions’ parameter. The intended usage is to provide a read and deserialize function for a specific datatype here (e.g.numpy.load,json.load) Make sure the return type of the reader function matches the type in the function signature for the data slot.writer:Callable[[IO, Any], None]: A callable accepting a file object and data to be written as a parameter. The intended use is to provide a write function for a datatype (e.g.numpy.save,json.dump). If the writer function does not match the given type signature, use alambdaexpression to adapt it, e.g.writer=lambda f, d: json.dump(d,f).media_type:str = "application/octet-stream"The IANA media type for this DataSlot, which determines the input/output filetype. There is aStrEnumthat holds common media types to use. Import:from pinexq.procon.dataslots import MediaTypes. This argument is often not optional: the default media type is for binary data, but JSONs and plaintext data should be annotated as such. If workflow DataSlot media types do not match, the workflow will fail.
Collection DataSlots can contain multiple files, and have additional parameters:
-
collection_reader:Callable[[[IO,]], Any]: A callable accepting a list of file objects as parameter and returning the datatype expected by the functions input parameter. The intended usage is to provide a read and deserialize function for a specific datatype when using a input collection data slot. Make sure the return type of the reader function matches the type in the function signature for the data slot.Use-case Examples:
->list[MyClass]each instance in the list is read from a slot in the data slot->MyTablemerge multiple files from a collection data slot to a table
-
collection_writer:Callable[[[IO,], Any], None]: A callable accepting a list of file objects as parameter and returning the datatype expected by the functions output/return parameter. The intended usage is to provide a write-and-serialize function for a specific datatype when using an output collection data slot. Make sure the return type of the writer function matches the type in the function signature for the data slot (unless not explicitly given, i.e.DataSlot). Use-case Examples:->list[MyClass]each instance in the list is written to a slot in the data slot
-
min_slots:int = 1: Specify the minimal number of expected slots in a data slot. Default is 1, can be 0. -
max_slots:int = 1: Specify the maximal number of expected slots in a data slot. Default is same asmin_slotsthus specifying an exact amount. When running in remote mode (PineXQ driving ProCon) this number specifies the default work data generated for the function run. Unused (0 bytes) work data will be deleted on completion.
Default reader and writer implementations
Section titled “Default reader and writer implementations”For handling of Pydantic classes we offer default implementations for readers and writers collected in DefaultReaderWriter. Import:
from pinexq.procon.dataslots.default_reader_writer import DefaultReaderWriterpydantic_base_reader
Section titled “pydantic_base_reader”Read given data slot into a given Pydantic class, expects JSON in data slot:
from pinexq.procon.dataslots import dataslotfrom pinexq.procon.dataslots.default_reader_writer import DefaultReaderWriter@dataslot.input( name="my_class", # lambda notation neccessary, also pass type of desired class MyBaseModel reader=lambda filehandle: DefaultReaderWriter.pydantic_base_reader(filehandle, MyBaseModel))def my_function(self, my_class: MyBaseModel) -> int:pydantic_base_writer
Section titled “pydantic_base_writer”Write given Pydantic class to data slot as JSON:
from pinexq.procon.dataslots import dataslotfrom pinexq.procon.dataslots.default_reader_writer import DefaultReaderWriter@dataslot.output( name="my_class", writer=DefaultReaderWriter.pydantic_base_writer)def my_function(self, input_data: dict, my_class: MyBaseModel) -> int:pydantic_list_base_reader
Section titled “pydantic_list_base_reader”Read given data slots into a given Pydantic class list, expects JSON in data slots:
from pinexq.procon.dataslots import dataslotfrom pinexq.procon.dataslots.default_reader_writer import DefaultReaderWriter@dataslot.input( name="my_classes", # lambda notation neccessary, also pass type of desired classes MyBaseModel reader=lambda filehandle: DefaultReaderWriter.pydantic_list_base_reader(filehandle, MyBaseModel))def my_function(self, my_classes: list[MyBaseModel]) -> int:pydantic_list_base_writer
Section titled “pydantic_list_base_writer”Write given Pydantic classes as JSON to slots in data slot. Make sure there are enough slots specified in the data slot (see max_slots):
from pinexq.procon.dataslots import dataslotfrom pinexq.procon.dataslots.default_reader_writer import DefaultReaderWriter@dataslot.output(name="my_classes", writer=DefaultReaderWriter.pydantic_list_base_writer)def my_function(self, my_classes: list[MyBaseModel]) -> int:Inline dataslots
Section titled “Inline dataslots”If the function requires direct access to a DataSlot it can declare a parameter to be of type DataSlot and use it as a file object.
It’s recommended to provide additional information with a decorator, but not necessary.
from pinexq.procon.dataslots import dataslot, DataSlot@dataslot.input('data_blob', title='In List 1') # optionaldef dataslot_read_data(self, data_blob: DataSlot) -> str: """Read data and return it as string""" with data_blob.slots[0] as f: data = f.read_data() return str(data)Internally, a data slot can contain multiple file references.
In that case you should define it as a “collection” by annotating it as @dataslot.input_collection(...).
To access single data sources (e.g. files), use the slots attribute list, which contains a Slot object for each URI or path provided.
The safest way to handle accessing the data in a Slot is to use a with context manager to open and close it.
For backends with remote data storage, exiting the context will also synchronize the data with the remote source.
To open all slots in a DataSlot at once, you can use the context manager on the DataSlot object and the use the slots as if they are already open.
An example use of inline dataslots, with an imported function em.write() that takes a filename and has a return value, is shown below.
As shown in the example, output files can also have tags and comments added to their metadata. This metadata is updated when the slot exits.
import external_model as emimport jsonfrom pinexq.procon.step import versionfrom pinexq.procon.dataslots import dataslot, DataSlot, MediaTypesfrom pinexq.procon.jobmanagement import job_from_step_contextfrom pinexq.client.job_management.model import SetNameWorkDataParameters
# inside a step...@version("0.1.0-dev1")@dataslot.output('model_file', title='Generated model.')@dataslot.output('map_file', title='Symbol map generated by model writer', writer=lambda f, d: json.dump(d,f), media_type=MediaTypes.JSON)def dataslot_generate_model(self, model_file: DataSlot, map_file: DataSlot) -> None: """Generate a Model, write it to model_file, and write the symbol map to map_file""" my_model = em.Model() with model_file.slots[0] as slot: # opening a slot _, symbol_map = my_model.write(slot.file.name) slot.meta.tags += ["model"] slot.meta.comment = "A generated model, saved as JSON." with map_file: # opening all slots map_file.write_data_to_slots(symbol_map) # uses configured writer
# set custom workdata name current_job = job_from_step_context(self) output_dataslots = current_job.get_output_data_slots() dataslot_hco = output_dataslots[0] output_workdata_hco = dataslot_hco.assigned_workdatas[0] output_workdata_hco.rename_action.execute(SetNameWorkDataParameters(NewName=f"test_model"))Output WorkData, by default, has the same name as the DataSlot; if the DataSlot is a collection, the index is appended. As shown above, changing the name of output WorkData is more involved. Similar to starting a sub job, we must acquire the running Job as an object, then access the WorkData’s HCO.
Managing secret files
Section titled “Managing secret files”Since WorkData cannot be viewed by those without ownership, WorkData can also be used to pass secrets. One common use pattern is to use a secret file to authenticate with an external API, by setting an appropriate environment variable to point to the input file:
import osfrom procon.step import versionfrom procon.dataslots import dataslot, DataSlot, MediaTypes
# inside a step...@version("0.1.0-dev1")@dataslot.input(name="license_file", media_type=MediaTypes.TEXT, description="Your external credentials file.")def sync_files(self, license_file: DataSlot) -> None: """ Search for files at data file path. If tags do not exist in workdata, upload the files. :param: license_file (DataSlot): Your external credentials file. :return: :rtype: """ with license_file.slots[0] as slot: os.environ["EXTERNAL_SHARED_CREDENTIALS_FILE"] = slot.file.name ... del os.environ["EXTERNAL_SHARED_CREDENTIALS_FILE"]However, since the contents of WorkData are not logged, even when passed as e.g. strings, secrets can also be passed the same way as normal str WorkData.
import osfrom procon.step import versionfrom procon.dataslots import dataslot, DataSlot, MediaTypesfrom external_api import ExternalLogin
# inside a step...@version("0.1.0-dev1")@dataslot.input(name="secret_access_key", media_type=MediaTypes.TEXT, description="Your external secret key.")def sync_files(self, access_key_id: str, secret_access_key: str) -> None: """ Search for files at data file path. If tags do not exist in workdata, upload the files. :param: license_file (DataSlot): Your external credentials file. :return: :rtype: """ external_client = ExternalLogin(access_key_id, secret_access_key) ...A convenient and secure way clean up environment variables involves a function decorator:
import osimport functoolsfrom procon.step import versionfrom procon.dataslots import dataslot, DataSlot, MediaTypes
def _finally_unset(f): """ A function wrapper for resetting the env var "finally". Will not raise error if env var does not exist. Args: f (Callable): Your function. Set credentials inside, and this will unset them automatically. Returns: """ @functools.wraps(f) def inner(*args, **kwargs): try: return f(*args, **kwargs) finally: os.environ.pop("EXTERNAL_SHARED_CREDENTIALS_FILE", None) return inner# inside a step...@_finally_unset@version("0.1.0-dev1")@dataslot.input(name="license_file", media_type=MediaTypes.TEXT, description="Your external credentials file.")def sync_files(self, license_file: DataSlot) -> None: """ Search for files at data file path. If tags do not exist in workdata, upload the files. :param: license_file (DataSlot): Your external credentials file. :return: :rtype: """ with license_file.slots[0] as slot: os.environ["EXTERNAL_SHARED_CREDENTIALS_FILE"] = slot.file.name ...