Indexing Public Bucket

This outlines the process of indexing the public dataset S3 bucket.

Scan the "noaa-wcsd-pds" bucket to find the super set of ships and cruises.

import boto3, os, re
import pandas as pd
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import as_completed

bucket = 'noaa-wcsd-pds'
session = boto3.Session(profile_name='rudy')
client = session.client('s3', region_name="us-east-1")

def task(s3_client, sub_prefix: str) -> list:
    print(f"task: {sub_prefix}")
    paginator = s3_client.get_paginator('list_objects_v2')
    page_iterator = paginator.paginate(Bucket=bucket, Prefix=sub_prefix)
    objects = []
    for page in page_iterator:
        objects.extend(page['Contents'])
    return objects

With all cruises use thread pool to scan the bucket efficiently.

def get_all_objects() -> pd.DataFrame:
    # get all objects in data/raw/ s3 folder, returns dataframe
    print("getting all objects")
    cruises = []
    for ship in s3.list_objects(Bucket=bucket, Prefix='data/raw/', Delimiter='/').get('CommonPrefixes'):
        for cruise in s3.list_objects(Bucket=bucket, Prefix=ship.get('Prefix'), Delimiter='/').get('CommonPrefixes'):
            cruises.append(cruise.get('Prefix'))
    all_objects = []
    with ThreadPoolExecutor(max_workers=64) as executor:
        futures = [executor.submit(task, s3, cruise) for cruise in cruises]
        for future in as_completed(futures):
            all_objects.extend(future.result())
    return pd.DataFrame(all_objects)

Convert the list of objects to a pandas dataframe. Use filename to define the date (note values might be in the future).

def get_subset_ek60_prefix(df) -> pd.DataFrame:
    # returns all objects with EK60 in prefix of file path
    print("getting subset of ek60 data by prefix")
    objects2 = []
    for index, row in df.iterrows():
        row_split = row['Key'].split(os.sep)
        if len(row_split) == 6:
            filename = os.path.basename(row['Key'])  # 'EX1608_EK60-D20161205-T040300.raw'
            if filename.endswith(".raw"):
                ship, cruise, sensor = row_split[2:5]  # 'Okeanos_Explorer', 'EX1608', 'EK60'
                # parse date if possible e.g.: 'data/raw/Henry_B._Bigelow/HB1006/EK60/HBB-D20100723-T025105.raw' and 'data/raw/Henry_B._Bigelow/HB1802/EK60/D20180513-T150250.raw'
                if re.search("[D](\d{8})", filename) is not None and re.search("[T](\d{6})", filename) is not None:
                    date_substring = re.search("[D](\d{8})", filename).group(1)
                    time_substring = re.search("[T](\d{6})", filename).group(1)
                    date = datetime.strptime(f'{date_substring}{time_substring}', '%Y%m%d%H%M%S')
                if sensor == 'EK60':
                    objects2.append(
                        {
                            'key': row['Key'],
                            'filename': filename,
                            'ship': ship,
                            'cruise': cruise,
                            'sensor': sensor,
                            'size': row['Size'],
                            'date': date,
                            'datagram': np.nan
                        }
                    )
    return pd.DataFrame(objects2)

There are now ~544,168 ".raw" files with 'EK60' specified in their key path.

Now upload the data to bucket in parquet format.

def get_subset_ek60_con0(df: pd.DataFrame) -> pd.DataFrame:
    ### scan first file from each cruise to designate as CON0 or XML0 ###
    print("getting subset of ek60 data with CON0 datagram")
    cruises = list(set(df['cruise']))
    cruises.sort()
    for cruise in cruises:
        select_df = df.loc[df['cruise'] == cruise]
        select_key = select_df.iloc[0]['key']
        # Test for ek80 using known file signature in bytes 3 to 7
        obj = boto3.resource("s3", config=s3._client_config).Object(bucket_name=bucket, key=select_key)  # XML0
        first_datagram = obj.get(Range='bytes=3-7')['Body'].read().decode().strip('\x00')
        # { “CON0”: “EK/ES60 and ES70 have a CON0 datagram first”, “XML0”: “ES80/EK80 file have a XML0 datagram first” }
        df.loc[df['cruise'] == cruise, ['datagram']] = first_datagram
    return df.loc[df['datagram'] == 'CON0']
all_objects = get_all_objects()
subset_ek60_by_prefix = get_subset_ek60_prefix(all_objects)
subset_ek60_by_datagram = get_subset_ek60_con0(subset_ek60_by_prefix)

write_to_parquet(
    df=subset_ek60_by_datagram,
    bucket=f"{bucket}-index",
    filename="noaa-wcsd-pds-index.parquet"
)

Final output is parquet file in s3 bucket with all ek60 files indexed.

Last updated

Was this helpful?