This snippet of code includes a simple class able to connect to [[S3]] and fetch and save [[Pandas]] `DataFrame` objects stored as [[CSV]]. There are several aspects to take into consideration: - Authentication is assumed so it should be already set using any of the[[boto3#Configuration|boto3 authentication options]]. - Even though the `S3Controller.save_df()` method includes a mechanism to upload the CSV as a compressed GNU zip (`.gz`) file, **there is no support for reading such files**. This option was implemented to be used when performing [[Redshift]] `COPY` queries from [[S3]]. ```python import gzip import logging from io import BytesIO, StringIO import boto3 import pandas as pd class S3Controller: """A basic interface to the S3 storage. This class offers some simple methods to write and read data from the S3 buckets required for the execution. There is just a `settings.toml` file (execution settings) that includes a `s3` section as a hard requirement. This object fetches credentials from the environment variables defined in the running context, i.e. `$AWS_ACCESS_KEY_ID` and `$AWS_SECRET_ACCESS_KEY.` Attributes: bucket_name (str): the name of the bucket to fetch data from. session (boto3.Session): the associated AWS session. client (boto3.Client): the associated S3 client. log (logging.Logger): the class' logging utility. """ def __init__(self, bucket_name): self.bucket_name = bucket_name self.session = boto3.session.Session() self.client = self.session.client("s3") self.log = logging.getLogger(__name__) def read_df(self, df, keypath, compressed=False, read_csv_kwargs=None): if compressed: raise NotImplementedError self.log.info(f"Getting {keypath} from S3 as CSV") response = self.client.get_object(Bucket=self.bucket_name, Key=keypath) status = response.get("ResponseMetadata", {}).get("HTTPStatusCode") if status != 200: raise IOError(f"Unsuccessful S3 get_object. Status - {status}") else: df = pd.read_csv(response.get("Body"), **(read_csv_kwargs or dict())) return df def save_df(self, df, keypath, compress=False, to_csv_kwargs=None): if compress: filename = filename + ".gz" file_desc = StringIO() compressed_file_desc = BytesIO() df.to_csv(file_desc, **(to_csv_kwargs or dict())) if compress: file_desc.seek(0) gzipped = gzip.GzipFile(fileobj=compressed_file_desc, mode="w") gzipped.write(bytes(file_desc.read(), "utf-8")) gzipped.close() data_to_upload = compressed_file_desc else: data_to_upload = file_desc self.log.info(f"Uploading {filename} to S3") data_to_upload.seek(0) self.client.upload_fileobj(data_to_upload, self.bucket_name, keypath) url = f"{self.bucket_name}/{keypath}" return url ```