Pandas le permite pasar una ruta de AWS S3 directamente a .to_csv()
y .to_parquet()
. Hay un storage_options
argumento para pasar argumentos específicos de S3.
Me gustaría llamar .to_csv('s3://bucket/key.csv', storage_options=something)
y especificar etiquetas de objeto S3 para aplicar al objeto cargado, como something
. He leído los documentos y no puedo entender cómo,
Los documentos de pandas no enumeran los valores posibles para storage_options
, solo apuntan a fsspec
. Parece que pandas llama a fsspec, que llama a s3fs, que llama a aiobotocore, que llama a botocore, y que probablemente llama a s3transfer. ¿Cómo puedo pasar los argumentos de la etiqueta S3 hasta el fondo de este agujero de conejo?
MWE
import pandas as pd
import boto3
bucket = 'mybucket' # change for your bucket
key = 'test/pandas/tags.csv'
tags = {'mytag': 'x'}
df = pd.DataFrame([{'a': 1}])
df.to_csv(f"s3://{bucket}/{key}") # try without any tags first
df.to_csv(f"s3://{bucket}/{key}", storage_options={'tags': tags})
resp = boto3.client('s3').get_object_tagging(Bucket=bucket, Key=key)
actual_tags = {t['Key']: t['Value'] for t in resp.get('TagSet', [])}
assert actual_tags == tags
comportamiento esperado
Pasa la afirmación. El objeto S3 tiene una etiqueta mytag
:x
comportamiento real
La segunda .to_csv()
línea falla. es decir, funciona sin etiquetas. Las etiquetas son lo que está causando el fracaso.
Traceback (most recent call last):
File "upld.py", line 9, in <module>
df.to_csv(f"s3://{bucket}/{key}", storage_options={'tags': tags})
File "/home/ec2-user/.pyenv/versions/3.8.11/lib/python3.8/site-packages/pandas/core/generic.py", line 3463, in to_csv
return DataFrameRenderer(formatter).to_csv(
File "/home/ec2-user/.pyenv/versions/3.8.11/lib/python3.8/site-packages/pandas/io/formats/format.py", line 1105, in to_csv
csv_formatter.save()
File "/home/ec2-user/.pyenv/versions/3.8.11/lib/python3.8/site-packages/pandas/io/formats/csvs.py", line 237, in save
with get_handle(
File "/home/ec2-user/.pyenv/versions/3.8.11/lib/python3.8/site-packages/pandas/io/common.py", line 608, in get_handle
ioargs = _get_filepath_or_buffer(
File "/home/ec2-user/.pyenv/versions/3.8.11/lib/python3.8/site-packages/pandas/io/common.py", line 357, in _get_filepath_or_buffer
file_obj = fsspec.open(
File "/home/ec2-user/.pyenv/versions/3.8.11/lib/python3.8/site-packages/fsspec/core.py", line 456, in open
return open_files(
File "/home/ec2-user/.pyenv/versions/3.8.11/lib/python3.8/site-packages/fsspec/core.py", line 299, in open_files
[fs.makedirs(parent, exist_ok=True) for parent in parents]
File "/home/ec2-user/.pyenv/versions/3.8.11/lib/python3.8/site-packages/fsspec/core.py", line 299, in <listcomp>
[fs.makedirs(parent, exist_ok=True) for parent in parents]
File "/home/ec2-user/.pyenv/versions/3.8.11/lib/python3.8/site-packages/fsspec/asyn.py", line 91, in wrapper
return sync(self.loop, func, *args, **kwargs)
File "/home/ec2-user/.pyenv/versions/3.8.11/lib/python3.8/site-packages/fsspec/asyn.py", line 71, in sync
raise return_result
File "/home/ec2-user/.pyenv/versions/3.8.11/lib/python3.8/site-packages/fsspec/asyn.py", line 25, in _runner
result[0] = await coro
File "/home/ec2-user/.pyenv/versions/3.8.11/lib/python3.8/site-packages/s3fs/core.py", line 746, in _makedirs
await self._mkdir(path, create_parents=True)
File "/home/ec2-user/.pyenv/versions/3.8.11/lib/python3.8/site-packages/s3fs/core.py", line 731, in _mkdir
await self._call_s3("create_bucket", **params)
File "/home/ec2-user/.pyenv/versions/3.8.11/lib/python3.8/site-packages/s3fs/core.py", line 252, in _call_s3
await self.set_session()
File "/home/ec2-user/.pyenv/versions/3.8.11/lib/python3.8/site-packages/s3fs/core.py", line 395, in set_session
self.session = aiobotocore.session.AioSession(**self.kwargs)
TypeError: __init__() got an unexpected keyword argument 'tags'
Parece que estos argumentos se pasan a la creación de instancias de sesión de aiobotocore, no a la llamada API put_object real de S3 de aiobotocore. Eso me hace pensar que no es posible hacer esto.
Alternativas
Debería intentar:
storage_options={
'tags': {
'k': 'v'
}
}
o
storage_options={
'tags': [
{'Key': 'k', 'Value': 'v'}
]
}
Por supuesto, podría cargar sin etiquetas y luego agregar etiquetas como una llamada de boto separada. Esto no es atómico y cuesta el doble (para archivos pequeños). Si hubiera una manera de recuperar la identificación de la versión de la carga, eso eliminaría algunos problemas de concurrencia (escrituras simultáneas).