Skip to main content

parallel_load_dataframes_s3

dataflow

To get started:

Dynamically pull and run

from hamilton import dataflows, driver
from hamilton.execution import executors

# downloads into ~/.hamilton/dataflows and loads the module -- WARNING: ensure you know what code you're importing!
parallel_load_dataframes_s3 = dataflows.import_module("parallel_load_dataframes_s3", "elijahbenizzy")
# Switch this out for ray, dask, etc. See docs for more info.
remote_executor = executors.MultiThreadingExecutor(max_tasks=20)

dr = (
driver.Builder()
.enable_dynamic_execution(allow_experimental_mode=True)
.with_remote_executor(remote_executor)
.with_config({}) # replace with configuration as appropriate
.with_modules(parallel_load_dataframes_s3)
.build()
)
# If you have sf-hamilton[visualization] installed, you can see the dataflow graph
# In a notebook this will show an image, else pass in arguments to save to a file
# dr.display_all_functions()
# Execute the dataflow, specifying what you want back. Will return a dictionary.
result = dr.execute(
[parallel_load_dataframes_s3.CHANGE_ME, ...], # this specifies what you want back
inputs={...} # pass in inputs as appropriate
)

Use published library version

pip install sf-hamilton-contrib --upgrade  # make sure you have the latest
from hamilton import dataflows, driver
from hamilton.execution import executors

# Make sure you've done - `pip install sf-hamilton-contrib --upgrade`
from hamilton.contrib.user.elijahbenizzy import parallel_load_dataframes_s3
# Switch this out for ray, dask, etc. See docs for more info.
remote_executor = executors.MultiThreadingExecutor(max_tasks=20)

dr = (
driver.Builder()
.enable_dynamic_execution(allow_experimental_mode=True)
.with_remote_executor(remote_executor)
.with_config({}) # replace with configuration as appropriate
.with_modules(parallel_load_dataframes_s3)
.build()
)
# If you have sf-hamilton[visualization] installed, you can see the dataflow graph
# In a notebook this will show an image, else pass in arguments to save to a file
# dr.display_all_functions()
# Execute the dataflow, specifying what you want back. Will return a dictionary.
result = dr.execute(
[parallel_load_dataframes_s3.CHANGE_ME, ...], # this specifies what you want back
inputs={...} # pass in inputs as appropriate
)

Modify for your needs

Now if you want to modify the dataflow, you can copy it to a new folder (renaming is possible), and modify it there.

dataflows.copy(parallel_load_dataframes_s3, "path/to/save/to")

Purpose of this module

This module loads up dataframes specified in a json or jsonl set of files from s3.

You have to pass it:

  1. The bucket to download from (bucket)
  2. The path within the bucket to crawl (path_in_bucket)
  3. The caching directory for saving files locally (save_dir)

Optionally, you can pass it:

  1. The AWS profile (aws_profile), this defaults to default

This will look for all files that end with json or jsonl, under s3://<bucket>/<path_within_bucket> download them to the save_dir, load them, and concatenate it into a dataframe. Note that if a file exists locally it is skipped -- thus this is idempotent.

Configuration Options

N/A

Limitations

This only downloads json/jsonl files, and currently crawls the entire sub-bucket. This also requires that all files be able to be held in memory, and are of a uniform schema.

Source code

__init__.py
import dataclasses
import logging
import os
from pathlib import Path
from typing import List

logger = logging.getLogger(__name__)

from hamilton import log_setup

log_setup.setup_logging(logging.INFO)

from hamilton import contrib

with contrib.catch_import_errors(__name__, __file__, logger):
# non-hamilton imports go here
import boto3
import pandas as pd
from boto3 import Session

pass

# hamilton imports go here; check for required version if need be.
from hamilton.htypes import Collect, Parallelizable

# from hamilton.log_setup import setup_logging

logger = logging.getLogger(__name__)


def s3(aws_profile: str = "default") -> boto3.resource:
"""Returns a boto3 resource for the 'aws_profile' profile"""
session = Session(profile_name=aws_profile)
return session.resource("s3")


@dataclasses.dataclass
class ToDownload:
"""Simple dataclass to contain downloading files"""

key: str
bucket: str


def ensured_save_dir(save_dir: str) -> str:
"""Ensures that a savedir exists for later"""
if not os.path.exists(save_dir):
Path(save_dir).mkdir()
return save_dir


def downloadable(
s3: boto3.resource, bucket: str, path_in_bucket: str
) -> Parallelizable[ToDownload]:
"""Lists downloadables from the s3 bucket"""

bucket_obj = s3.Bucket(bucket)
objs = list(bucket_obj.objects.filter(Prefix=path_in_bucket).all())
objs = [obj for obj in objs if obj.key.endswith(".jsonl") or obj.key.endswith(".json")]
logger.info(f"Found {len(objs)} objects in {bucket}/{path_in_bucket}")
for obj in objs:
yield ToDownload(key=obj.key, bucket=bucket)


def _already_downloaded(path: str) -> bool:
"""Checks if the data is already downloaded"""
if os.path.exists(path):
return True
return False


def downloaded_data(
downloadable: ToDownload,
ensured_save_dir: str,
s3: boto3.resource,
) -> str:
"""Downloads data, short-circuiting if the data already exists locally"""
download_location = os.path.join(ensured_save_dir, downloadable.key)
if _already_downloaded(download_location):
logger.info(f"Already downloaded {download_location}")
return download_location
parent_path = os.path.dirname(download_location)
if not os.path.exists(parent_path):
os.makedirs(parent_path, exist_ok=True)
# This works with threading, but might not work in parallel with multiprocessing
# TODO -- use a connection pool
bucket = s3.Bucket(downloadable.bucket)
bucket.download_file(downloadable.key, download_location)
logger.info(f"Downloaded {download_location}")
return download_location


def all_downloaded_data(downloaded_data: Collect[str]) -> List[str]:
"""Returns a list of all downloaded locations"""
out = []
for path in downloaded_data:
out.append(path)
return out


def _jsonl_parse(path: str) -> pd.DataFrame:
"""Loads a jsonl file into a dataframe"""
return pd.read_json(path, lines=True)


def processed_dataframe(all_downloaded_data: List[str]) -> pd.DataFrame:
"""Processes everything into a dataframe"""
out = []
for floc in all_downloaded_data:
out.append(_jsonl_parse(floc))
return pd.concat(out)


if __name__ == "__main__":
# Code to create an imaging showing on DAG workflow.
# run as a script to test Hamilton's execution
import __init__ as parallel_load_dataframes_s3

from hamilton import driver

dr = (
driver.Builder()
.with_modules(parallel_load_dataframes_s3)
.enable_dynamic_execution(allow_experimental_mode=True)
.build()
)

# saves to current working directory creating dag.png.
dr.display_all_functions("dag", {"format": "png", "view": False})

Requirements

# put non-hamilton requirements here
boto3
pandas