AI News HubLIVE
In-site rewrite6 min read

Finding a Needle in the Haystack: Querying Physical AI Data with Daft

This article explores how to use the Daft framework to query Apple's EgoDex dataset efficiently. By combining frame-level semantic embeddings (like SigLIP) with geometric features, researchers can use natural language to search video clips, such as 'find every clip where a writing-gripped hand lifts chopsticks.' Daft enables discoverability in large-scale, unstructured robotic datasets.

SourceHacker News AIAuthor: sammysidhu

Back to Blog

TL;DR

We ran Daft on Apple's EgoDex and showed what Daft makes possible: searching on video using natural language queries combining frame-level semantic embeddings and geometric features, i.e., “find every clip in my dataset where a writing-gripped hand lifts chopsticks.” Using Daft, you can now CTRL+F over your robotics dataset.

The Menu Problem

Historically, robots have performed tasks listed on a pre-defined menu. An Amazon Kiva used to only need to know how to move shelving pods from storage to picker stations. Your Roomba only needs to know how to clean your floor. This made dataset curation somewhat hand-curatable. Using carefully set up experiments, a researcher or engineer could easily generate data for a specific task. Contents of the dataset were easily known by construction.

Amazon KivaRoomba

In uncontrolled environments, that assumption falls apart. When training a clothes-folding robot, you can’t enumerate every garment configuration in advance. So instead of designing experiments, you outfit humans with head-mounted cameras and let them fold, reach, and grasp, hoping that the world provides the variation needed to capture the full distribution of data. That data doesn’t come labeled. When you need to later audit your training mix or retrain on failure cases (like a tangled sleeve, silk blouse, inside-out sock), you have no idea where those moments are.

Scale that up even further: A fleet of 500 autonomous vehicles uploads petabytes of video and sensor data to the cloud every day. How do you, as a researcher, identify your near crashes and traffic violations amongst hundreds of 100k+ hours of video for re-training?

Your data is no longer served off a menu. The data just arrives, continuously growing faster than anyone could understand it, and what’s in it isn’t well-known: it needs to be discovered. How do you retrieve a specific subset of data based on unlabeled multimodal features for downstream task fine-tuning? How can you find difficult edge cases and failures within your data?

This is the data understanding problem most frontier robotics labs are facing.

Finding a Needle in the Haystack: EgoDex

EgoDex is Apple’s egocentric dataset consisting of paired hand pose annotations and head-view video across varying tabletop tasks. It’s full of unique scenarios and rich sensor and video data, making it a perfect candidate for multimodal understanding.

But it runs into the menu problem. For example, the task description “fold a small t-shirt on a wooden table while sitting” obscures some important geometric and visual primitives.

How can I tell if my training mix is short on twisting actions?

How can I select a subset of episodes where the person holds something with a tight hammer grip?

Let's see how Daft can provide the solution.

Ingesting the EgoDex Dataset

In the HDF5 file format, a single file holds many named n-dimensional arrays plus metadata, like a tiny filesystem per file. Since EgoDex as well as many other robotics datasets come packaged in the HDF5 format, we decided to write a new Hdf5File type with native support in Daft, alongside existing types like VideoFile, AudioFile, etc.

Download the raw EgoDex dataset from Apple here:

mkdir -p .data curl "https://ml-site.cdn-apple.com/datasets/egodex/test.zip" -o .data/test.zip unzip .data/test.zip

Every EgoDex episode is one {i}.hdf5 (the hand-pose transforms) sitting next to a corresponding {i}.mp4 (the head-cam video). We write a method, read_egodex turns that whole dataset into a single per-frame DataFrame in Daft:

from egodex_lib.egodex import read_egodex

Raw EgoDex HDF5 → one row per frame, straight into Daft.

df = read_egodex(".data/**/*.hdf5", with_video=True) HDF5 to Daft Implementation Details

daft.from_files yields one row per HDF5 file. A single @daft.func UDF opens each file through the native Hdf5File API, batch-reads every transform it needs in one call, runs NumPy feature math (build_state, build_skeleton, build_extrinsics), and returns the episode as a list of per-frame structs. We then run an explode then fans that list into one row per frame with the task broadcasted.

@daft.func(return_dtype=EPISODE_DTYPE) def process_egodex_episode(file_: daft.File) -> dict: h = file_.as_hdf5() transforms = h.read(list(dict.fromkeys(STATE_TRANSFORMS + SKELETON_TRANSFORMS + [CAMERA])))

state = build_state(transforms) skeleton = build_skeleton(transforms) extrinsics = build_extrinsics(transforms) action = next_frame_action(state) task = resolve_task(h.attrs()) # native attrs() returns a dict

frames = [ { "frame_index": i, "observation.state": state[i], "observation.skeleton": skeleton[i], "observation.extrinsics": extrinsics[i], "action": action[i], } for i in range(len(state)) ] return {"task": task, "frames": frames} def read_egodex(hdf5_glob, with_video: bool = False): per_file = Window().order_by(col("file").file_path()) episodes = ( daft.from_files(hdf5_glob) .sort(col("file").file_path()) .with_column("episode_index", row_number().over(per_file) - 1)

carry the HDF5 path so the video decoder can find each episode's sibling .mp4

.with_column("_src", col("file").file_path()) .into_batches(8) .with_column("_ep", process_egodex_episode(col("file"))) .with_column("task", col("_ep")["task"]) .with_column("frames", col("_ep")["frames"]) )

frames = ( episodes.explode("frames") .select("episode_index", "task", "_src", col("frames").unnest()) .with_column("timestamp", (col("frame_index") / FPS).cast(DataType.float32())) )

if with_video: frames = frames.with_column("observation.image", _decode_sibling_mp4(col("_src"), col("timestamp"))) return frames.exclude("_src")

Here's what your dataframe output should look like after reading the first 3 rows:

Here are some example videos (abridged schemas):

episode_index frame_index task observation.state observation.skeleton

varies per clip below [48 floats] [204 floats]

Fold a small tshirt on a wooden table while sitting.

Pick up food with chopsticks.

Assemble a chair while sitting at a table.

Sweep the marbles into a pile.

Remove lids from three cups on a wooden table.

Unstack four cups.

What's in Each Frame: SigLIP Embeddings

Now, using Daft, we'll run Google’s SigLIP-2 image encoder over a subsampled set of frames (1 fps), across episodes, and store the result as a vector_column in the Daft DataFrame.

from daft import DataType, Series from transformers import AutoModel, AutoProcessor import torch

def _auto_device() -> str: if _HAS_CUDA: return "cuda" if torch.backends.mps.is_available(): return "mps" return "cpu"

_HAS_CUDA = torch.cuda.is_available() GPUS = 1 if _HAS_CUDA else 0

DEVICE = os.environ.get("CLIP_DEVICE", _auto_device()) DTYPE = torch.float16 if DEVICE == "cuda" else torch.float32

SUBSAMPLE = 30 # keep 1 of every 30 frames (~1 fps); semantic content barely changes between adjacent frames MODEL_ID = "google/siglip2-base-patch16-224" EMB_DIM = 768 # SigLIP2-base shared image/text embedding dim (must match the model)

The encoder is wrapped in a @daft.cls, which is a stateful UDF. Unlike plan UDFs, a class UDF instantiates once per worker and stays in GPU memory to be reused across all batches:

def _normalized_embedding(model_output) -> torch.Tensor: """Pull the embedding tensor out of a transformers output and L2-normalize it.

transformers 5.x returns a model-output object from get_image_features / get_text_features; older versions returned a bare tensor. Handle both. """ if torch.is_tensor(model_output): feats = model_output else: feats = model_output.pooler_output feats = feats.float() return feats / feats.norm(dim=-1, keepdim=True) # unit-norm so cosine == dot product

@daft.cls(gpus=GPUS, max_concurrency=1, use_process=False) class SiglipEmbedder: def init(self) -> None: self.model = AutoModel.from_pretrained(MODEL_ID, torch_dtype=DTYPE).to(DEVICE).eval() self.processor = AutoProcessor.from_pretrained(MODEL_ID)

@daft.method.batch(return_dtype=DataType.embedding(DataType.float32(), EMB_DIM), batch_size=16) def embed_image(self, images: Series):

images.to_pylist() yields uint8 H×W×C numpy arrays; the SigLIP processor takes them

directly (verified identical to the PIL path), so no per-frame Image.fromarray needed.

inputs = self.processor(images=images.to_pylist(), return_tensors="pt").to(DEVICE) with torch.no_grad(): model_output = self.model.get_image_features(**inputs) embeddings = _normalized_embedding(model_output) return list(embeddings.cpu().numpy()) from egodex_lib import egodex from daft import col emb = ( egodex.embed_frames( df.where(col("episode_index").is_in(EPISODES)) ) .select("episode_index", "frame_index", "clip_emb") .collect() ) emb.show(3)

Then, Daft streams the frames through the encoder in batches and writes each 768D vector back as a column in the DataFrame.

We embed once and reuse at query time. Later on query, a text query (”chopsticks”, “folded shirt”) will be encoded by the same SigLip model, and cosine similarity against clip_emb becomes a similarity column in the DataFrame.

What the Hands Are Doing: Geometric Features

SigLIP can tell you a frame contains chopsticks. It can’t tell you the hand holding them is in a writing grip. That’s a geometric fact that can be computed directly from the 48D wrist pose and the 204D joint skeleton in the sensor data.

We propose an abstraction for geometric scenarios in EgoDex as "states and actions".

States = a property of the hands/wrist/arms that can be computed over just one frame.

We researched hand poses using hand-surgery research, and what we discovered is that the field splits into two families: precision grips, where the fingertips and thumb delicately pinch an object, and power grips, where the whole hand wraps around it. We also classify hand openness as a state using the hand flexion model.

Actions = a property of the hands/wrist/arms that must be computed over several frames.

Lifting, for example, is characterized by the Y-position of the wrist increasing quicker than usual over time. This generalizes to most actions, where we compute some kind of metric like a rate of change to detect when an action occurs, as well as when it starts and stops.

This is a summary of which poses we detect and how we do it:

ScenarioTypeWhat we computeImage

Hand opennessStateMean finger flexion (MCP+PIP+DIP joint angles)

Writing grip (tripod)StateThumb meets index/middle fingertips; ring & little more curled

Hammer grip (power)StateAll four fingers wrapped; thumb folded over the knuckles

TwistingActionWrist rotation about the forearm axis (pronation/supination)

ReachingActionArm extension (hand-to-shoulder distance) increasing

LiftingActionWrist vertical velocity

GraspingActionFingers closing (curl decreasing over time)

In-hand manipulationActionWrist still while fingers actively move

Each frame's sensor data encodes geometric quantities the video alone doesn't: observation.state (48 numbers, wrist + 5 fingertips per hand) and observation.skeleton (204 numbers, 68 joints). To support detecting static poses as well as in-progress actions, we use these geometric quantities to compute states per frame as well as action rates over time and write them as columns alongside the SigLIP embeddings.

Look at how we do math to preprocess geometric quantities using Numpy & Daft!

Computing forearm roll - daft udf

@daft.func(return_dtype=DataType.float64()) def forearm_roll(rot6d, rot6d_next, forearm_axis): """Wrist roll (rad) about the forearm axis from one frame to the next (0 at an episode's last frame).""" if rot6d is None or rot6d_next is None: return 0.0 delta = _rotation_matrix(rot6d_next) @ _rotation_matrix(rot6d).T angle = np.arccos(np.clip((np.trace(delta) - 1) / 2, -1, 1)) axis = np.array([delta[2, 1] - delta[1, 2], delta[0, 2] - delta[2, 0], delta[1, 0] - delta[0, 1]]) magnitude = np.linalg.norm(axis) if magnitude (N, 3, 3) rotation matrices (columns = hand x, y axes + palm norma

[truncated for AI cost control]