Indexing Public Bucket
This outlines the process of indexing the public dataset S3 bucket.
Scan the "noaa-wcsd-pds" bucket to find the super set of ships and cruises.
import boto3, os, re
import pandas as pd
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import as_completed
bucket = 'noaa-wcsd-pds'
session = boto3.Session(profile_name='rudy')
client = session.client('s3', region_name="us-east-1")
def task(s3_client, sub_prefix: str) -> list:
print(f"task: {sub_prefix}")
paginator = s3_client.get_paginator('list_objects_v2')
page_iterator = paginator.paginate(Bucket=bucket, Prefix=sub_prefix)
objects = []
for page in page_iterator:
objects.extend(page['Contents'])
return objects
With all cruises use thread pool to scan the bucket efficiently.
def get_all_objects() -> pd.DataFrame:
# get all objects in data/raw/ s3 folder, returns dataframe
print("getting all objects")
cruises = []
for ship in s3.list_objects(Bucket=bucket, Prefix='data/raw/', Delimiter='/').get('CommonPrefixes'):
for cruise in s3.list_objects(Bucket=bucket, Prefix=ship.get('Prefix'), Delimiter='/').get('CommonPrefixes'):
cruises.append(cruise.get('Prefix'))
all_objects = []
with ThreadPoolExecutor(max_workers=64) as executor:
futures = [executor.submit(task, s3, cruise) for cruise in cruises]
for future in as_completed(futures):
all_objects.extend(future.result())
return pd.DataFrame(all_objects)
Convert the list of objects to a pandas dataframe. Use filename to define the date (note values might be in the future).
def get_subset_ek60_prefix(df) -> pd.DataFrame:
# returns all objects with EK60 in prefix of file path
print("getting subset of ek60 data by prefix")
objects2 = []
for index, row in df.iterrows():
row_split = row['Key'].split(os.sep)
if len(row_split) == 6:
filename = os.path.basename(row['Key']) # 'EX1608_EK60-D20161205-T040300.raw'
if filename.endswith(".raw"):
ship, cruise, sensor = row_split[2:5] # 'Okeanos_Explorer', 'EX1608', 'EK60'
# parse date if possible e.g.: 'data/raw/Henry_B._Bigelow/HB1006/EK60/HBB-D20100723-T025105.raw' and 'data/raw/Henry_B._Bigelow/HB1802/EK60/D20180513-T150250.raw'
if re.search("[D](\d{8})", filename) is not None and re.search("[T](\d{6})", filename) is not None:
date_substring = re.search("[D](\d{8})", filename).group(1)
time_substring = re.search("[T](\d{6})", filename).group(1)
date = datetime.strptime(f'{date_substring}{time_substring}', '%Y%m%d%H%M%S')
if sensor == 'EK60':
objects2.append(
{
'key': row['Key'],
'filename': filename,
'ship': ship,
'cruise': cruise,
'sensor': sensor,
'size': row['Size'],
'date': date,
'datagram': np.nan
}
)
return pd.DataFrame(objects2)
There are now ~544,168 ".raw" files with 'EK60' specified in their key path.
Now upload the data to bucket in parquet format.
def get_subset_ek60_con0(df: pd.DataFrame) -> pd.DataFrame:
### scan first file from each cruise to designate as CON0 or XML0 ###
print("getting subset of ek60 data with CON0 datagram")
cruises = list(set(df['cruise']))
cruises.sort()
for cruise in cruises:
select_df = df.loc[df['cruise'] == cruise]
select_key = select_df.iloc[0]['key']
# Test for ek80 using known file signature in bytes 3 to 7
obj = boto3.resource("s3", config=s3._client_config).Object(bucket_name=bucket, key=select_key) # XML0
first_datagram = obj.get(Range='bytes=3-7')['Body'].read().decode().strip('\x00')
# { “CON0”: “EK/ES60 and ES70 have a CON0 datagram first”, “XML0”: “ES80/EK80 file have a XML0 datagram first” }
df.loc[df['cruise'] == cruise, ['datagram']] = first_datagram
return df.loc[df['datagram'] == 'CON0']
all_objects = get_all_objects()
subset_ek60_by_prefix = get_subset_ek60_prefix(all_objects)
subset_ek60_by_datagram = get_subset_ek60_con0(subset_ek60_by_prefix)
write_to_parquet(
df=subset_ek60_by_datagram,
bucket=f"{bucket}-index",
filename="noaa-wcsd-pds-index.parquet"
)
Final output is parquet file in s3 bucket with all ek60 files indexed.
Last updated
Was this helpful?