Metaflow

Metaflow is a tool originally developed by Netflix. It allows data scientists to scale their pipelines from a simple script on their local machine to a production ready end-to-end workflow. The compute can be handled by AWS Batch, AWS Step Functions, or a custom kubernetes cluster.

This guide demonstrates how the LightlyOne Worker can be used with Metaflow on AWS Batch.

Prerequisites

In order to use the LightlyOne Worker in any automated workflow, the following prerequisites are necessary:

Wrap the LightlyOne Worker

Metaflow expects the LightlyOne Worker Docker image to have no explicit entrypoint. Since the LightlyOne Worker comes with an entrypoint, you'll have to wrap the image in your own and host it on your AWS repository. You can use the following Dockerfile to get the desired image:

FROM lightly/worker:latest # Install additional dependencies here if required # Remove the entrypoint from the LightlyOne Worker ENTRYPOINT []

Next, build the Docker image, tag it, and push it to your AWS repository. Don't forget the replace {YOUR_IDENTIFIER} in the commands below. It is the identifier of your Amazon Elastic Container Registry

docker build --file Dockerfile.metaflow --tag my-lightly/worker ./ docker tag my-lightly/worker:latest {YOUR_IDENTIFIER}.amazonaws.com/my-lightly/worker:latest docker push {YOUR_IDENTIFIER}.amazonaws.com/my-lightly/worker:latest

Automation with Metaflow

For this guide you need to have Metaflow deployed to AWS or kubernetes. If you haven't done that already, follow the instructions here. Additionally, you need to install the Metaflow Python client:

pip install metaflow

Now, you can use the following flow to run the LightlyOne Worker directly on AWS Batch. Don't forget to change the settings according to your setup!

The flow will execute the following steps:

  • Create a Dataset
  • Configure a Datasource
  • Schedule a Run
  • Start the LightlyOne Worker and process the scheduled run

You execute the flow locally with python lightly_worker_flow.py run.

📘

Example for S3

Note that the code below shows how to automate the LightlyOne Worker if your data is stored on S3. Of course, you can also use other cloud storage providers. Simply replace the lines configuring the datasource with your preferred setup.

import os import time from pathlib import Path from omegaconf import OmegaConf from multiprocessing import Process from datetime import datetime from lightly.cli.config import get_config now = datetime.now().strftime("%Y-%m-%d-%H-%M-%S-%f") # Metaflow from metaflow import FlowSpec, step, batch, environment # Settings LIGHTLY_TOKEN = "YOUR_LIGHTLY_TOKEN" IMAGE_NAME: str = "my-lightly/worker:latest" DATASET_NAME: str = f"My_Dataset_{now}" # LightlyOne Worker config SELECTION_CONFIG = { "proportion_samples": 0.5, "strategies": [ {"input": {"type": "EMBEDDINGS"}, "strategy": {"type": "DIVERSITY"}} ], } WORKER_CONFIG = {} # S3 S3_REGION = "YOUR_S3_REGION" S3_ACCESS_KEY_ID = "YOUR_S3_ACCESS_KEY_ID" S3_SECRET_ACCESS_KEY = "YOUR_S3_SECRET_ACCESS_KEY" S3_BUCKET_PATH = "YOUR_S3_BUCKET_PATH" # Worker WORKER_ID = "YOUR_WORKER_ID" WORKER_LABEL = "YOUR_WORKER_LABEL" # Constants (do not change) HOME_DIR = Path("/home/lightly_worker") # prior to worker version 2.11 this must be "/home/boris" HYDRA_DIR = Path(".hydra") METAFLOW_DIR = Path(".metaflow") RESOURCE_PATH = Path("onprem-docker/lightly_worker/src/lightly_worker/resources") DOCKER_CONFIG = Path("docker/docker.yaml") class LightlyWorkerFlow(FlowSpec): """Flow which processes videos in an S3 bucket using the LightlyOne Worker. """ @batch( image=IMAGE_NAME, cpu=8, gpu=1, memory=120000, ) @environment( vars={ "LIGHTLY_TOKEN": LIGHTLY_TOKEN, } ) @step def start(self): """Schedules a job and processes it with the LightlyOne Worker. """ # Set up the environment os.chdir(HOME_DIR) HYDRA_DIR.mkdir(exist_ok=True) METAFLOW_DIR.mkdir(exist_ok=True) try: # Wrap this in a try-catch to make the linter happy from lightly_worker import main from lightly.api import ApiWorkflowClient from lightly.openapi_generated.swagger_client import DatasetType from lightly.openapi_generated.swagger_client import DatasourcePurpose except ImportError: pass # Create a dataset client = ApiWorkflowClient() client.create_dataset(DATASET_NAME, DatasetType.VIDEOS) self.dataset_id = client.dataset_id # Configure s3 client.set_s3_config( resource_path=S3_BUCKET_PATH, region=S3_REGION, access_key=S3_ACCESS_KEY_ID, secret_access_key=S3_SECRET_ACCESS_KEY, purpose=DatasourcePurpose.INPUT, ) client.set_s3_config( resource_path=S3_BUCKET_PATH, region=S3_REGION, access_key=S3_ACCESS_KEY_ID, secret_access_key=S3_SECRET_ACCESS_KEY, purpose=DatasourcePurpose.LIGHTLY, ) # Schedule a run self.scheduled_run_id = client.schedule_compute_worker_run( worker_config=WORKER_CONFIG, selection_config=SELECTION_CONFIG, runs_on=[WORKER_LABEL] ) # Run the LightlyOne Worker cfg = OmegaConf.load(RESOURCE_PATH / DOCKER_CONFIG) cfg.lightly = get_config.get_lightly_config() cfg.worker.worker_id = WORKER_ID process = Process(target=main.main, args=(cfg,)) process.start() # Monitor the scheduled run last_run_info = None while True: run_info = client.get_compute_worker_run_info( scheduled_run_id=self.scheduled_run_id) if run_info.in_end_state(): assert run_info.ended_successfully(), "Run did not end successfully" break if run_info != last_run_info: no_update_count = 0 else: no_update_count += 1 if no_update_count >= 10: raise RuntimeError( f"Test timout: no run_info update for at least 5 minutes\n" f"last_run_info: {str(last_run_info)}, run_info: {str(run_info)}" ) last_run_info = run_info time.sleep(30) # Finish up process.terminate() self.next(self.end) @step def end(self): """You can access selected images using the Lightly dataset_id. """ # Continue working with the selected images using the dataset_id print(f"My Lightly dataset_id is {self.dataset_id}") if __name__ == "__main__": LightlyWorkerFlow()

Did this page help you?