Indexing Public Bucket
This outlines the process of indexing the public dataset S3 bucket.
Scan the "noaa-wcsd-pds" bucket to find the super set of ships and cruises.
import boto3, os, re
import pandas as pd
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import as_completed
bucket = 'noaa-wcsd-pds'
session = boto3.Session(profile_name='rudy')
client = session.client('s3', region_name="us-east-1")
def task(s3_client, sub_prefix: str) -> list:
print(f"task: {sub_prefix}")
paginator = s3_client.get_paginator('list_objects_v2')
page_iterator = paginator.paginate(Bucket=bucket, Prefix=sub_prefix)
objects = []
for page in page_iterator:
objects.extend(page['Contents'])
return objectsWith all cruises use thread pool to scan the bucket efficiently.
def get_all_objects() -> pd.DataFrame:
# get all objects in data/raw/ s3 folder, returns dataframe
print("getting all objects")
cruises = []
for ship in s3.list_objects(Bucket=bucket, Prefix='data/raw/', Delimiter='/').get('CommonPrefixes'):
for cruise in s3.list_objects(Bucket=bucket, Prefix=ship.get('Prefix'), Delimiter='/').get('CommonPrefixes'):
cruises.append(cruise.get('Prefix'))
all_objects = []
with ThreadPoolExecutor(max_workers=64) as executor:
futures = [executor.submit(task, s3, cruise) for cruise in cruises]
for future in as_completed(futures):
all_objects.extend(future.result())
return pd.DataFrame(all_objects)Convert the list of objects to a pandas dataframe. Use filename to define the date (note values might be in the future).
There are now ~544,168 ".raw" files with 'EK60' specified in their key path.
Now upload the data to bucket in parquet format.
Final output is parquet file in s3 bucket with all ek60 files indexed.
Last updated
Was this helpful?