PyTorch

Inference Runner

To easily capture model metadata, you can use our inference runners. These runners will run a dataset through the model and log the metadata katiML needs, without having to call upload_to_lake() or a REST API.

from dioptra.inference.torch.torch_runner import TorchInferenceRunner
class TorchInferenceRunner(          
    model: Model,
    model_type: str,
    model_name: str,
    embeddings_layers: Optional[List[str]],
    logits_layer: Optional[str],
    datapoint_ids: Optional[List[str]],
    datapoints_metadata: Optional[List[object]],
    dataset_metadata: Optional[object],
    data_transform: Optional[transforms],
    mc_dropout_samples: Optional[int],
    device: Optional[str],
    class_names: List[str]
)
Arguments
Description
model

a pytorch model

model_type

the type of model to be used CLASSIFICATION or SEGMENTATION

model_name

the name of the model. should incliude version number if running more than once

embeddings_layers

a list of names of embeddings layers. It can combine names and indexes. For example: [0].embeddings

logits_layer

the name of the logits layer

datapoints_ids

list of datapoints ids to be updated. The list should be in the same order as the dataset

datapoints_metadata

list of metadata to be added to each datapoints. The list should be in the same order as the dataset

dataset_metadata

a metadata object to be added to all datapoints

data_transform

an optional transform method to be applied to each batch of the dataset. Could be useful to remove the groundtruth from the dataset iterator.

mc_dropout_samples

how many monte carlo dropout samples to take. if > 0, then this is a monte carlo dropout experiment, the model will be put in train mode before running inference

device

a string to control where to run the inference. cpu or cuda

class_names

a list of class names corresponding to the logits indexes

def run(
    self,
    dataset: Dataset
)
Arguments
Description
dataset

an iterable dataset to run the model inference on. The iterator should only return the features, not the groundtruth. Groundtruth should be passed as a metadata. It should not be shuffled is used with metadata

def wait_for_uploads(self): -> [object]
# Waits on all metadata generated during inference to be uploaded to Dioptra.
# Returns the list of uploads generated by the runner during the inference.

Example usage

import os

os.environ['DIOPTRA_API_KEY'] = 'my_api_key'
os.environ['DIOPTRA_UPLOAD_BUCKET'] = 'my_upload_bucket'

my_model = ... 
my_dataset = ... # some dataset that we can iterate over
my_classes = ...

my_runner = ClassifierRunner(
    model=model, 
    embeddings_layers=['embeddings'],
    logits_layer='logits',
    class_names=my_classes,
    metadata=metadata
)

my_runner.run(my_dataset)

my_runner.wait_for_uploads()

print('Metadata generated and uploaded!')

You can also send metadata like groundtruth, image uris or tags. Here is a complete example.

import os

os.environ['DIOPTRA_API_KEY'] = 'my_api_key'
os.environ['DIOPTRA_UPLOAD_BUCKET'] = 'my_upload_bucket'

from datasets import load_dataset
from torchvision import transforms
import uuid
from dioptra.inference.torch.torch_runner import TorchInferenceRunner

preprocess = transforms.Compose([
    transforms.Lambda(lambda x:x.numpy()),
    transforms.ToPILImage(),
    transforms.Resize(256),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

def my_transform(batch):
    processed = preprocess(batch['image']).unsqueeze(0)
    return processed

dataset = load_dataset('cats_vs_dogs').with_format('torch')['train'].select(range(10))

metadata = [{
    'tags': {
        'datapoint_id': str(uuid.uuid4())
        'model_id': 'test_model_torch'
    }
} for i in range(10)]

my_runner = TorchInferenceRunner(
    model=model, 
    embeddings_layers=['layer4'],
    logits_layer='fc',
    class_names=categories,
    data_transform=my_transform,
    device='cuda',
    metadata=metadata
)

my_runner.run(dataset)

my_runner.wait_for_uploads()

print('Metadata generated and uploaded!')

Object Store Dataset

To easily use katiML in your training pipeline, we provide a wrapper around torch.utils.data.Dataset. This dataset can download object store images into a local cache to optimize loading time.

If self.use_caching is True, the dataset will use the caching dir defined by DIOPTRA_CACHE_DIR (default is ~.dioptra) to pull cache the image using the path stored DataFrame under metadata.uri . The images will be loaded using smart_open so proper credentials should be configured. the resulting image will be a PIL image stored in the image field of the returned row.

Other pre processing will also happen for SEGMENTATION to decompress and load the mask under groundtruths[0].encoded_segmentation_class_mask

The dataset can prefetch images using a multi processing thread pool using the prefetch_images method or can be wrapped in a torch.utils.data.DataLoader with several workers to stream the data to the notebook. and still feed the GPU.

from dioptra.lake.torch.object_store_datasets import ImageDataset
class ImageDataset(
    dataframe: DataFrame,
    transform: torchvision.transform
)
Arguments
Description
dataframe

the data frame to be used as a data source. Can come from download_from_lake() or dataset.download()

transform

a torchsivison.transform method to be applied to each item when iterating over the dataset

def prefetch_images(
    self,
    num_workers: int -> 1
)
Arguments
Description
num_workers

the number of workers to be used to pre fetch the data. Default is 1

Example usage

from dioptra.lake.utils import select_datapoints
from dioptra.lake.torch.object_store_datasets import ImageDataset

unlabeled_df = select_datapoints(filters=[{
    'left': 'tags.data_split',
    'op': '=',
    'right': 'train'
}], fields=['image_metadata.uri', 'tags.datapoint_id', 'request_id'])

unlabeled_dataset = ImageDataset(unlabeled_df)

Example in a pipeline

import torch
from torchvision import transforms
from torch.utils.data import DataLoader

from dioptra.inference.torch.torch_runner import TorchInferenceRunner
from dioptra.lake.utils import select_datapoints
from dioptra.lake.torch.object_store_datasets import ImageDataset

unlabeled_df = select_datapoints(filters=[{
    'left': 'tags.data_split',
    'op': '=',
    'right': 'train'
}], fields=['image_metadata.uri', 'tags.datapoint_id', 'request_id'])

unlabeled_dataset = ImageDataset(unlabeled_df)

transform_pipe = transforms.Compose([
    transforms.Lambda(lambda x: x.convert('RGB')),
    transforms.Resize(256),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
    transforms.Lambda(lambda x: x.repeat(3, 1, 1) if x.shape[0] == 1 else x),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

def transform(row):
    return transform_pipe(row['image'])
    
unlabeled_dataset.transform = transform
unlabeled_dataset.load_images = True
unlabeled_dataset.prefetch_images(20)

data_loader = DataLoader(
    unlabeled_dataset, batch_size=10, num_workers=4, shuffle=False)

torch_model = torch.hub.load('pytorch/vision:v0.10.0', 'resnet18', pretrained=True)
torch_model.to('cuda')

my_runner = TorchInferenceRunner(
    model=torch_model, 
    embeddings_layers=['layer4'],
    device='cuda',
    metadata=first_run_metadata
)

my_runner.run(data_loader)

Last updated

Was this helpful?