Skip to content

Commit e9de470

Browse files
authored
Merge pull request #113 from telefonicasc/s3-library
Remplazo del sdk de minio por el de s3 oficial
2 parents add632b + 0038dcc commit e9de470

File tree

8 files changed

+213
-210
lines changed

8 files changed

+213
-210
lines changed

.github/workflows/unit-testing.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ jobs:
2929
with:
3030
python-version: ${{ matrix.python-version }}
3131
- name: Install pytest tool
32-
run: pip install pytest==8.3.4 pytest-minio-mock==0.4.19
32+
run: pip install pytest==8.3.4
3333
- name: Install library dependencies
3434
run: pip install -e python-lib/tc_etl_lib
3535
- name: Test with pytest

python-lib/tc_etl_lib/README.md

Lines changed: 24 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -239,36 +239,35 @@ iotam: tc.iota.iotaManager = tc.iota.iotaManager(endpoint = 'http://<iota_endpoi
239239
iotam.send_batch_http(data=[{"<key_1>": "<value_1>", "<key_2>": "<value_2>"}, {"<key_3>": "<value_3>", "<key_4>": "<value_4>"}])
240240
```
241241

242-
Ejemplo de uso de la clase minioManager
242+
Ejemplo de uso de la clase objectStorageManager
243243

244244
```python
245245
# import library
246246
import tc_etl_lib as tc
247247

248-
# declare minioManager and get initialized client
249-
minio_manager = tc.minioManager(endpoint='<minio_endpoint>:<port>',
250-
access_key='<user>',
251-
secret_key='<password>')
252-
248+
# declare objectStorageManager
249+
object_storage_manager = tc.objectStorageManager(endpoint='<http/https>://<object_storage_endpoint>:<port>',
250+
access_key='<user>',
251+
secret_key='<password>')
253252

254253
# Upload test-file.txt to python-test-bucket/output/example.txt
255254
# note test-file.txt must exist in the same directory where this example is run
256-
minio_manager.upload_file(bucket_name='python-test-bucket',
257-
destination_file='/output/example.txt',
258-
source_file="test-file.txt")
255+
object_storage_manager.upload_file(bucket_name='python-test-bucket',
256+
destination_file='/output/example.txt',
257+
source_file="test-file.txt")
259258

260259
# You can define your own custom processing method and use it in the processing_method argument of the process_file method
261260
def process_chunk(file_chunk):
262261
print(file_chunk)
263262

264263
# Retrieve example.txt and apply custom method to each 3 bytes chunk
265-
minio_manager.process_file(bucket_name='python-test-bucket',
266-
destination_file='/output/example.txt',
267-
chunk_size=3,
268-
processing_method=process_chunk)
264+
object_storage_manager.process_file(bucket_name='python-test-bucket',
265+
destination_file='/output/example.txt',
266+
chunk_size=3,
267+
processing_method=process_chunk)
269268

270269
# Remove the bucket created in the upload file method
271-
minio_manager.remove_bucket(minio_client, "python-test-bucket")
270+
object_storage_manager.remove_bucket("python-test-bucket")
272271
```
273272

274273
## Funciones disponibles en la librería
@@ -410,26 +409,26 @@ La librería está creada con diferentes clases dependiendo de la funcionalidad
410409
- :param obligatorio: `data`: Datos a enviar. Puede ser una lista de diccionarios o un DataFrame.
411410
- :raises SendBatchError: Se levanta cuando se produce una excepción dentro de `send_http`. Atrapa la excepción original y se guarda y se imprime el índice donde se produjo el error.
412411

413-
- Clase `minioManager`: En esta clase están las funciones relacionadas con la solución de almacenamiento de objetos MinIO.
412+
- Clase `objectStorageManager`: En esta clase están las funciones relacionadas con la solución de almacenamiento de objetos.
414413

415414
- `__init__`: constructor de objetos de la clase.
416-
- :param obligatorio `endpoint`: enpoint de acceso a MinIO
417-
- :param obligatorio `access_key`: usuario necesario para hacer login en MinIO
418-
- :param obligatorio `secret_key`: contraseña necesaria para hacer login en MinIO
419-
- :param optional `secure`: flag para indicar si la conexión con MinIO usa https (True) o http (False). Por defecto se considera `True` si se omite el parámetro.
415+
- :param obligatorio `endpoint`: enpoint de acceso a nuestro servicio de object storage
416+
- :param obligatorio `access_key`: usuario necesario para hacer login en nuestro servicio de object storage
417+
- :param obligatorio `secret_key`: contraseña necesaria para hacer login en nuestro servicio de object storage
420418
- :raises [ValueError](https://docs.python.org/3/library/exceptions.html#ValueError): Se lanza cuando le falta alguno de los argumentos obligatorios.
421419
- `create_bucket`: crea el bucket si no existe, si existe no hace nada.
422420
- :param obligatorio `bucket_name`: nombre del bucket a crear.
423421
- `remove_bucket`: borra el bucket si existe, si no existe no hace nada.
424422
- :param obligatorio `bucket_name`: nombre del bucket a borrar.
425-
- `upload_file`: sube un fichero a MinIO (si ya existe lo sobreescribe). Si el bucket al que se sube no existe se crea previamente.
423+
- :raises [Exception](https://docs.python.org/3/library/exceptions.html#Exception): Se lanza cuando se captura una excepción en el borrado del bucket
424+
- `upload_file`: sube un fichero (si ya existe lo sobreescribe). Si el bucket al que se sube no existe se crea previamente.
426425
- :param obligatorio `bucket_name`: nombre del bucket donde se va a subir el fichero.
427-
- :param obligatorio `destination_file`: nombre del fichero en MinIO (puede incluir el path SIN el nombre del bucket al inicio).
426+
- :param obligatorio `destination_file`: nombre del fichero en el bucket (puede incluir el path SIN el nombre del bucket al inicio).
428427
- :param obligatorio `source_file`: nombre del fichero local a subir (puede incluir el path).
429-
- :return: objeto con el estado de la subida del fichero.
430-
- `process_file`: procesa un fichero de MinIO por fragmentos y le aplica a cada fragmento la función provista.
428+
- :raises [Exception](https://docs.python.org/3/library/exceptions.html#Exception): Se lanza cuando se captura una excepción en la subida del fichero
429+
- `process_file`: procesa por fragmentos un fichero subido y le aplica a cada fragmento la función provista.
431430
- :param obligatorio `bucket_name`: nombre del bucket donde se va a buscar el fichero.
432-
- :param obligatorio `file`: nombre del fichero en MinIO (puede incluir el path SIN el nombre del bucket al inicio).
431+
- :param obligatorio `file`: nombre del fichero en el bucket (puede incluir el path SIN el nombre del bucket al inicio).
433432
- :param obligatorio `processing_method`: método a aplicar a cada fragmento del fichero.
434433
- :param optional `chunk_size`: tamaño en bytes de cada fragmento del fichero a recuperar. Por defecto 500000 bytes si se omite el argumento
435434
- :raises [Exception](https://docs.python.org/3/library/exceptions.html#Exception): Se lanza cuando se captura una excepción en el procesamiento del fichero
@@ -567,7 +566,7 @@ TOTAL 403 221 45%
567566
## Changelog
568567

569568

570-
- Add: new class `minioManager` to manage MinIO connection and file processing ([#109](https://github.com/telefonicasc/etl-framework/issues/109))
569+
- Add: new class `objectStorageBucket` to manage bucket based object storage compatible with S3 API (such as AWS S3 or MINIMO) ([#109](https://github.com/telefonicasc/etl-framework/issues/109))
571570

572571
0.16.0 (September 29th, 2025)
573572

python-lib/tc_etl_lib/setup.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,15 +47,16 @@
4747
# La última release de numpy antes de 2.0.0 es 1.26.4.
4848
# La última release de numpy compatible con python 3.8 es 1.24.4
4949
'numpy==1.24.4',
50-
'minio==7.2.7'
50+
'boto3==1.37.38'
51+
5152
]
5253
INSTALL_REQUIRES_PYTHON_3_12 = [
5354
'requests>=2.28.2,<2.33.0',
5455
'urllib3==1.26.16',
5556
'psycopg2-binary>=2.9.5',
5657
'pandas==2.2.2',
5758
'numpy==2.2.0',
58-
'minio==7.2.18'
59+
'boto3==1.40.55'
5960
]
6061

6162
setup(

python-lib/tc_etl_lib/tc_etl_lib/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,4 +25,4 @@
2525
from .iota import iotaManager
2626
from .store import Store, orionStore, sqlFileStore
2727
from .normalizer import normalizer
28-
from .minio import minioManager
28+
from .object_storage import objectStorageManager

python-lib/tc_etl_lib/tc_etl_lib/minio.py renamed to python-lib/tc_etl_lib/tc_etl_lib/object_storage.py

Lines changed: 38 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -19,32 +19,28 @@
1919
# along with IoT orchestrator. If not, see http://www.gnu.org/licenses/.
2020

2121
"""
22-
Minio routines for Python:
23-
- minioManager.
22+
Object storage routines for Python:
23+
- objectStorageManager.
2424
"""
25-
from minio import Minio
2625
from typing import Optional, cast
2726
import logging
27+
import boto3
2828

2929
logger = logging.getLogger(__name__)
3030

3131

32-
class minioManager:
33-
"""Minio Manager
32+
class objectStorageManager:
33+
"""Object storage Manager
3434
35-
endpoint: define minio endpoint
36-
access_key: user to log in to minio
37-
secret_key: password to log in to minio
38-
secure: flag to select if the connection to MinIO is https or http (True by default)
39-
client: authenticated MinIO client
35+
endpoint: define Object storage endpoint
36+
access_key: user to log in to Object storage
37+
secret_key: password to log in to Object storage
4038
"""
4139
endpoint: str
4240
access_key: str
4341
secret_key: str
44-
secure: bool
45-
client: Minio
4642

47-
def __init__(self, endpoint: Optional[str] = None, access_key: Optional[str] = None, secret_key: Optional[str] = None, secure=True):
43+
def __init__(self, endpoint: Optional[str] = None, access_key: Optional[str] = None, secret_key: Optional[str] = None):
4844

4945
messageError = []
5046
if endpoint is None:
@@ -61,27 +57,27 @@ def __init__(self, endpoint: Optional[str] = None, access_key: Optional[str] = N
6157
if len(messageError) != 1:
6258
defineParams = " and ".join(
6359
[", ".join(messageError[:-1]), messageError[-1]])
64-
raise ValueError(f'You must define {defineParams} in minioManager')
60+
raise ValueError(f'You must define {defineParams} in objectStorageManager')
6561

6662
# At this point, all Optional[str] have been validated to be not None.
6763
# cast them to let type checker knows.
6864
self.endpoint = cast(str, endpoint)
6965
self.access_key = cast(str, access_key)
7066
self.secret_key = cast(str, secret_key)
71-
self.secure = secure
7267
self.client = self.__init_client()
7368

7469
def __init_client(self):
7570
"""
76-
Create a MinIO client with the class endpoint, its access key and secret key.
71+
Create a Object storage client with the class endpoint, its access key and secret key.
7772
78-
:return authenticated MinIO client
73+
:return authenticated Object storage client
7974
"""
80-
return Minio(
81-
self.endpoint,
82-
self.access_key,
83-
self.secret_key,
84-
secure=self.secure
75+
return boto3.client(
76+
's3',
77+
endpoint_url=self.endpoint,
78+
aws_access_key_id=self.access_key,
79+
aws_secret_access_key=self.secret_key,
80+
aws_session_token=None
8581
)
8682

8783
def create_bucket(self, bucket_name):
@@ -90,25 +86,25 @@ def create_bucket(self, bucket_name):
9086
9187
:param bucket_name: name of the bucket where the file is located
9288
"""
93-
found = self.client.bucket_exists(bucket_name)
94-
if not found:
95-
self.client.make_bucket(bucket_name)
89+
try:
90+
self.client.create_bucket(Bucket=bucket_name)
9691
logger.debug(f'Created bucket ({bucket_name})')
97-
else:
98-
logger.debug(f'Bucket {bucket_name} already exists')
92+
except Exception as e:
93+
# BucketAlreadyExists or BucketAlreadyOwnedByYou
94+
logger.debug(f'Error creating the bucket: {e}')
9995

10096
def remove_bucket(self, bucket_name):
10197
"""
10298
Remove the bucket if it exists.
10399
104100
:param bucket_name: name of the bucket where the file is located
105101
"""
106-
found = self.client.bucket_exists(bucket_name)
107-
if found:
108-
self.client.remove_bucket(bucket_name)
102+
try:
103+
self.client.delete_bucket(Bucket=bucket_name)
109104
logger.debug(f'Removed bucket {bucket_name}')
110-
else:
111-
logger.debug(f'Bucket {bucket_name} doesnt exist')
105+
except Exception as e:
106+
logger.debug(f'An error ocurred while deleting {bucket_name}: {e}')
107+
raise Exception(f'An error ocurred while deleting {bucket_name}: {e}')
112108

113109
def upload_file(self, bucket_name, destination_file, source_file):
114110
"""
@@ -117,18 +113,17 @@ def upload_file(self, bucket_name, destination_file, source_file):
117113
:param bucket_name: name of the bucket where the file is located
118114
:param destination_file: name of the file to retrieve (can include path without bucket_name)
119115
:param source_file: name of the file to upload (can include path)
120-
:return object with the status of the upload
121116
"""
122117
# Bucket must exist before uploading file
123118
self.create_bucket(bucket_name)
124119

125120
logger.debug(
126121
f'Uploading {source_file} as object {destination_file} to bucket {bucket_name}')
127-
return self.client.fput_object(
128-
bucket_name,
129-
object_name=destination_file,
130-
file_path=source_file,
131-
)
122+
try:
123+
self.client.upload_file(source_file, bucket_name, destination_file)
124+
except Exception as e:
125+
logger.debug(f'An error ocurred while uploading the file: {e}')
126+
raise Exception(f'An error ocurred while uploading the file: {e}')
132127

133128
def process_file(self, bucket_name, file, processing_method, chunk_size=500000):
134129
"""Retrieves a file in chunks and applies a function to each chunk
@@ -138,22 +133,19 @@ def process_file(self, bucket_name, file, processing_method, chunk_size=500000):
138133
:param processing_method: method to apply to each chunk of the retrieved file
139134
:param chunk_size: size in bytes of the chunks to retrieve (500000 by default)
140135
"""
141-
file_size = self.client.stat_object(
142-
bucket_name, object_name=file).size or 0
136+
file_size = self.client.get_object_attributes(
137+
Bucket=bucket_name, Key=file, ObjectAttributes=['ObjectSize'])['ObjectSize'] or 0
143138

144-
response = None
145139
for offset in range(0, file_size, chunk_size):
146140
# Get the file
147141
try:
142+
byte_range = f'bytes={offset}-{offset+chunk_size-1}'
148143
response = self.client.get_object(
149-
bucket_name, file, offset, chunk_size)
144+
Bucket=bucket_name, Key=file, Range=byte_range)
150145
# response.data returns bytes
151-
processing_method(response.data)
146+
processing_method(response['Body'].read())
152147
except Exception as e:
153148
raise Exception(
154149
f'An error occured while processing the file: {e}')
155150

156151
logger.debug(f'Processing ended.')
157-
if response:
158-
response.close()
159-
response.release_conn()

0 commit comments

Comments
 (0)