This snippet of code includes a simple class able to connect to [[S3]] and fetch and save [[Pandas]] `DataFrame` objects stored as [[CSV]]. There are several aspects to take into consideration:
- Authentication is assumed so it should be already set using any of the[[boto3#Configuration|boto3 authentication options]].
- Even though the `S3Controller.save_df()` method includes a mechanism to upload the CSV as a compressed GNU zip (`.gz`) file, **there is no support for reading such files**. This option was implemented to be used when performing [[Redshift]] `COPY` queries from [[S3]].
```python
import gzip
import logging
from io import BytesIO, StringIO
import boto3
import pandas as pd
class S3Controller:
"""A basic interface to the S3 storage.
This class offers some simple methods to write and read data from the S3
buckets required for the execution. There is just a `settings.toml` file
(execution settings) that includes a `s3` section as a hard requirement.
This object fetches credentials from the environment variables defined in
the running context, i.e. `$AWS_ACCESS_KEY_ID` and
`$AWS_SECRET_ACCESS_KEY.`
Attributes:
bucket_name (str): the name of the bucket to fetch data from.
session (boto3.Session): the associated AWS session.
client (boto3.Client): the associated S3 client.
log (logging.Logger): the class' logging utility.
"""
def __init__(self, bucket_name):
self.bucket_name = bucket_name
self.session = boto3.session.Session()
self.client = self.session.client("s3")
self.log = logging.getLogger(__name__)
def read_df(self, df, keypath, compressed=False, read_csv_kwargs=None):
if compressed:
raise NotImplementedError
self.log.info(f"Getting {keypath} from S3 as CSV")
response = self.client.get_object(Bucket=self.bucket_name, Key=keypath)
status = response.get("ResponseMetadata", {}).get("HTTPStatusCode")
if status != 200:
raise IOError(f"Unsuccessful S3 get_object. Status - {status}")
else:
df = pd.read_csv(response.get("Body"), **(read_csv_kwargs or dict()))
return df
def save_df(self, df, keypath, compress=False, to_csv_kwargs=None):
if compress:
filename = filename + ".gz"
file_desc = StringIO()
compressed_file_desc = BytesIO()
df.to_csv(file_desc, **(to_csv_kwargs or dict()))
if compress:
file_desc.seek(0)
gzipped = gzip.GzipFile(fileobj=compressed_file_desc, mode="w")
gzipped.write(bytes(file_desc.read(), "utf-8"))
gzipped.close()
data_to_upload = compressed_file_desc
else:
data_to_upload = file_desc
self.log.info(f"Uploading {filename} to S3")
data_to_upload.seek(0)
self.client.upload_fileobj(data_to_upload, self.bucket_name, keypath)
url = f"{self.bucket_name}/{keypath}"
return url
```