Skip to content

Commit add632b

Browse files
authored
Merge pull request #110 from telefonicasc/minio-library-functionalities
Minio library functionalities
2 parents 58acb5e + 14819e3 commit add632b

File tree

7 files changed

+394
-3
lines changed

7 files changed

+394
-3
lines changed

.github/workflows/unit-testing.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ jobs:
2929
with:
3030
python-version: ${{ matrix.python-version }}
3131
- name: Install pytest tool
32-
run: pip install pytest==8.3.4
32+
run: pip install pytest==8.3.4 pytest-minio-mock==0.4.19
3333
- name: Install library dependencies
3434
run: pip install -e python-lib/tc_etl_lib
3535
- name: Test with pytest

python-lib/tc_etl_lib/README.md

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -239,6 +239,38 @@ iotam: tc.iota.iotaManager = tc.iota.iotaManager(endpoint = 'http://<iota_endpoi
239239
iotam.send_batch_http(data=[{"<key_1>": "<value_1>", "<key_2>": "<value_2>"}, {"<key_3>": "<value_3>", "<key_4>": "<value_4>"}])
240240
```
241241

242+
Ejemplo de uso de la clase minioManager
243+
244+
```python
245+
# import library
246+
import tc_etl_lib as tc
247+
248+
# declare minioManager and get initialized client
249+
minio_manager = tc.minioManager(endpoint='<minio_endpoint>:<port>',
250+
access_key='<user>',
251+
secret_key='<password>')
252+
253+
254+
# Upload test-file.txt to python-test-bucket/output/example.txt
255+
# note test-file.txt must exist in the same directory where this example is run
256+
minio_manager.upload_file(bucket_name='python-test-bucket',
257+
destination_file='/output/example.txt',
258+
source_file="test-file.txt")
259+
260+
# You can define your own custom processing method and use it in the processing_method argument of the process_file method
261+
def process_chunk(file_chunk):
262+
print(file_chunk)
263+
264+
# Retrieve example.txt and apply custom method to each 3 bytes chunk
265+
minio_manager.process_file(bucket_name='python-test-bucket',
266+
destination_file='/output/example.txt',
267+
chunk_size=3,
268+
processing_method=process_chunk)
269+
270+
# Remove the bucket created in the upload file method
271+
minio_manager.remove_bucket(minio_client, "python-test-bucket")
272+
```
273+
242274
## Funciones disponibles en la librería
243275

244276
La librería está creada con diferentes clases dependiendo de la funcionalidad deseada.
@@ -378,6 +410,29 @@ La librería está creada con diferentes clases dependiendo de la funcionalidad
378410
- :param obligatorio: `data`: Datos a enviar. Puede ser una lista de diccionarios o un DataFrame.
379411
- :raises SendBatchError: Se levanta cuando se produce una excepción dentro de `send_http`. Atrapa la excepción original y se guarda y se imprime el índice donde se produjo el error.
380412

413+
- Clase `minioManager`: En esta clase están las funciones relacionadas con la solución de almacenamiento de objetos MinIO.
414+
415+
- `__init__`: constructor de objetos de la clase.
416+
- :param obligatorio `endpoint`: enpoint de acceso a MinIO
417+
- :param obligatorio `access_key`: usuario necesario para hacer login en MinIO
418+
- :param obligatorio `secret_key`: contraseña necesaria para hacer login en MinIO
419+
- :param optional `secure`: flag para indicar si la conexión con MinIO usa https (True) o http (False). Por defecto se considera `True` si se omite el parámetro.
420+
- :raises [ValueError](https://docs.python.org/3/library/exceptions.html#ValueError): Se lanza cuando le falta alguno de los argumentos obligatorios.
421+
- `create_bucket`: crea el bucket si no existe, si existe no hace nada.
422+
- :param obligatorio `bucket_name`: nombre del bucket a crear.
423+
- `remove_bucket`: borra el bucket si existe, si no existe no hace nada.
424+
- :param obligatorio `bucket_name`: nombre del bucket a borrar.
425+
- `upload_file`: sube un fichero a MinIO (si ya existe lo sobreescribe). Si el bucket al que se sube no existe se crea previamente.
426+
- :param obligatorio `bucket_name`: nombre del bucket donde se va a subir el fichero.
427+
- :param obligatorio `destination_file`: nombre del fichero en MinIO (puede incluir el path SIN el nombre del bucket al inicio).
428+
- :param obligatorio `source_file`: nombre del fichero local a subir (puede incluir el path).
429+
- :return: objeto con el estado de la subida del fichero.
430+
- `process_file`: procesa un fichero de MinIO por fragmentos y le aplica a cada fragmento la función provista.
431+
- :param obligatorio `bucket_name`: nombre del bucket donde se va a buscar el fichero.
432+
- :param obligatorio `file`: nombre del fichero en MinIO (puede incluir el path SIN el nombre del bucket al inicio).
433+
- :param obligatorio `processing_method`: método a aplicar a cada fragmento del fichero.
434+
- :param optional `chunk_size`: tamaño en bytes de cada fragmento del fichero a recuperar. Por defecto 500000 bytes si se omite el argumento
435+
- :raises [Exception](https://docs.python.org/3/library/exceptions.html#Exception): Se lanza cuando se captura una excepción en el procesamiento del fichero
381436

382437
Algunos ejemplos de uso de `normalizer`:
383438

@@ -511,6 +566,9 @@ TOTAL 403 221 45%
511566

512567
## Changelog
513568

569+
570+
- Add: new class `minioManager` to manage MinIO connection and file processing ([#109](https://github.com/telefonicasc/etl-framework/issues/109))
571+
514572
0.16.0 (September 29th, 2025)
515573

516574
- Add: new optional parameter `idPattern` in cbManager's methods get_entities and get_entities_page used to filter entities by Identity pattern

python-lib/tc_etl_lib/setup.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,14 +46,16 @@
4646
# ValueError: numpy.dtype size changed, may indicate binary incompatibility. Expected 96 from C header, got 88 from PyObject
4747
# La última release de numpy antes de 2.0.0 es 1.26.4.
4848
# La última release de numpy compatible con python 3.8 es 1.24.4
49-
'numpy==1.24.4'
49+
'numpy==1.24.4',
50+
'minio==7.2.7'
5051
]
5152
INSTALL_REQUIRES_PYTHON_3_12 = [
5253
'requests>=2.28.2,<2.33.0',
5354
'urllib3==1.26.16',
5455
'psycopg2-binary>=2.9.5',
5556
'pandas==2.2.2',
56-
'numpy==2.2.0'
57+
'numpy==2.2.0',
58+
'minio==7.2.18'
5759
]
5860

5961
setup(

python-lib/tc_etl_lib/tc_etl_lib/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,3 +25,4 @@
2525
from .iota import iotaManager
2626
from .store import Store, orionStore, sqlFileStore
2727
from .normalizer import normalizer
28+
from .minio import minioManager
Lines changed: 159 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,159 @@
1+
#!/usr/bin/env python
2+
# -*- coding: utf-8 -*-
3+
#
4+
# Copyright 2022 Telefónica Soluciones de Informática y Comunicaciones de España, S.A.U.
5+
#
6+
# This file is part of tc_etl_lib
7+
#
8+
# tc_etl_lib is free software: you can redistribute it and/or
9+
# modify it under the terms of the GNU Affero General Public License as
10+
# published by the Free Software Foundation, either version 3 of the
11+
# License, or (at your option) any later version.
12+
#
13+
# tc_etl_lib is distributed in the hope that it will be useful,
14+
# but WITHOUT ANY WARRANTY; without even the implied warranty of
15+
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero
16+
# General Public License for more details.
17+
#
18+
# You should have received a copy of the GNU Affero General Public License
19+
# along with IoT orchestrator. If not, see http://www.gnu.org/licenses/.
20+
21+
"""
22+
Minio routines for Python:
23+
- minioManager.
24+
"""
25+
from minio import Minio
26+
from typing import Optional, cast
27+
import logging
28+
29+
logger = logging.getLogger(__name__)
30+
31+
32+
class minioManager:
33+
"""Minio Manager
34+
35+
endpoint: define minio endpoint
36+
access_key: user to log in to minio
37+
secret_key: password to log in to minio
38+
secure: flag to select if the connection to MinIO is https or http (True by default)
39+
client: authenticated MinIO client
40+
"""
41+
endpoint: str
42+
access_key: str
43+
secret_key: str
44+
secure: bool
45+
client: Minio
46+
47+
def __init__(self, endpoint: Optional[str] = None, access_key: Optional[str] = None, secret_key: Optional[str] = None, secure=True):
48+
49+
messageError = []
50+
if endpoint is None:
51+
messageError.append('<<endpoint>>')
52+
53+
if access_key is None:
54+
messageError.append('<<access_key>>')
55+
56+
if secret_key is None:
57+
messageError.append('<<secret_key>>')
58+
59+
if len(messageError) != 0:
60+
defineParams = messageError[0]
61+
if len(messageError) != 1:
62+
defineParams = " and ".join(
63+
[", ".join(messageError[:-1]), messageError[-1]])
64+
raise ValueError(f'You must define {defineParams} in minioManager')
65+
66+
# At this point, all Optional[str] have been validated to be not None.
67+
# cast them to let type checker knows.
68+
self.endpoint = cast(str, endpoint)
69+
self.access_key = cast(str, access_key)
70+
self.secret_key = cast(str, secret_key)
71+
self.secure = secure
72+
self.client = self.__init_client()
73+
74+
def __init_client(self):
75+
"""
76+
Create a MinIO client with the class endpoint, its access key and secret key.
77+
78+
:return authenticated MinIO client
79+
"""
80+
return Minio(
81+
self.endpoint,
82+
self.access_key,
83+
self.secret_key,
84+
secure=self.secure
85+
)
86+
87+
def create_bucket(self, bucket_name):
88+
"""
89+
Create the bucket if it doesn't exist.
90+
91+
:param bucket_name: name of the bucket where the file is located
92+
"""
93+
found = self.client.bucket_exists(bucket_name)
94+
if not found:
95+
self.client.make_bucket(bucket_name)
96+
logger.debug(f'Created bucket ({bucket_name})')
97+
else:
98+
logger.debug(f'Bucket {bucket_name} already exists')
99+
100+
def remove_bucket(self, bucket_name):
101+
"""
102+
Remove the bucket if it exists.
103+
104+
:param bucket_name: name of the bucket where the file is located
105+
"""
106+
found = self.client.bucket_exists(bucket_name)
107+
if found:
108+
self.client.remove_bucket(bucket_name)
109+
logger.debug(f'Removed bucket {bucket_name}')
110+
else:
111+
logger.debug(f'Bucket {bucket_name} doesnt exist')
112+
113+
def upload_file(self, bucket_name, destination_file, source_file):
114+
"""
115+
Upload the file, renaming it in the process
116+
117+
:param bucket_name: name of the bucket where the file is located
118+
:param destination_file: name of the file to retrieve (can include path without bucket_name)
119+
:param source_file: name of the file to upload (can include path)
120+
:return object with the status of the upload
121+
"""
122+
# Bucket must exist before uploading file
123+
self.create_bucket(bucket_name)
124+
125+
logger.debug(
126+
f'Uploading {source_file} as object {destination_file} to bucket {bucket_name}')
127+
return self.client.fput_object(
128+
bucket_name,
129+
object_name=destination_file,
130+
file_path=source_file,
131+
)
132+
133+
def process_file(self, bucket_name, file, processing_method, chunk_size=500000):
134+
"""Retrieves a file in chunks and applies a function to each chunk
135+
136+
:param bucket_name: name of the bucket where the file is located
137+
:param file: name of the file to retrieve (can include path without bucket_name)
138+
:param processing_method: method to apply to each chunk of the retrieved file
139+
:param chunk_size: size in bytes of the chunks to retrieve (500000 by default)
140+
"""
141+
file_size = self.client.stat_object(
142+
bucket_name, object_name=file).size or 0
143+
144+
response = None
145+
for offset in range(0, file_size, chunk_size):
146+
# Get the file
147+
try:
148+
response = self.client.get_object(
149+
bucket_name, file, offset, chunk_size)
150+
# response.data returns bytes
151+
processing_method(response.data)
152+
except Exception as e:
153+
raise Exception(
154+
f'An error occured while processing the file: {e}')
155+
156+
logger.debug(f'Processing ended.')
157+
if response:
158+
response.close()
159+
response.release_conn()
Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
# -*- coding: utf-8 -*-
2+
#
3+
# Copyright 2023 Telefónica Soluciones de Informática y Comunicaciones de España, S.A.U.
4+
#
5+
# This file is part of tc_etl_lib
6+
#
7+
# tc_etl_lib is free software: you can redistribute it and/or
8+
# modify it under the terms of the GNU Affero General Public License as
9+
# published by the Free Software Foundation, either version 3 of the
10+
# License, or (at your option) any later version.
11+
#
12+
# tc_etl_lib is distributed in the hope that it will be useful,
13+
# but WITHOUT ANY WARRANTY; without even the implied warranty of
14+
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero
15+
# General Public License for more details.
16+
#
17+
# You should have received a copy of the GNU Affero General Public License
18+
# along with IoT orchestrator. If not, see http://www.gnu.org/licenses/.
19+
20+
'''
21+
MinIO Manager tests.
22+
'''
23+
24+
from pytest_minio_mock.plugin import minio_mock
25+
from unittest import mock
26+
from tc_etl_lib.minio import minioManager
27+
import os
28+
29+
30+
def init_minio_manager():
31+
return minioManager(
32+
endpoint='localhost:9000',
33+
access_key='admin',
34+
secret_key='admin123')
35+
36+
37+
def test_create_bucket(minio_mock):
38+
minio_manager = init_minio_manager()
39+
40+
minio_manager.create_bucket("test_bucket")
41+
buckets = minio_manager.client.list_buckets()
42+
assert len(buckets) == 1
43+
44+
45+
def test_remove_bucket(minio_mock):
46+
minio_manager = init_minio_manager()
47+
48+
minio_manager.create_bucket("test_bucket")
49+
minio_manager.remove_bucket("test_bucket")
50+
buckets = minio_manager.client.list_buckets()
51+
assert len(buckets) == 0
52+
53+
54+
def test_upload_file(minio_mock):
55+
minio_manager = init_minio_manager()
56+
bucket_name = 'test-bucket'
57+
file = 'test_minioManager_file.txt'
58+
59+
# Create the test file if it doesnt exist
60+
fichero_test = open(file, "w")
61+
fichero_test.write("Test text")
62+
fichero_test.close()
63+
64+
minio_manager.create_bucket(bucket_name)
65+
result = minio_manager.upload_file(bucket_name,
66+
destination_file=file,
67+
source_file=file)
68+
69+
# Remove the test file
70+
os.remove(file)
71+
# pytest_minio_mock returns a string while real minio returns an object
72+
assert result == "Upload successful"
73+
74+
75+
def test_process_file(minio_mock):
76+
minio_manager = init_minio_manager()
77+
bucket_name = 'test-bucket'
78+
file = "test-minioManager-file.txt"
79+
out_file_name = "out.txt"
80+
81+
# Create the test file if it doesnt exist
82+
fichero_test = open(file, "w")
83+
fichero_test.write("Test text")
84+
fichero_test.close()
85+
86+
minio_manager.create_bucket(bucket_name)
87+
88+
minio_manager.upload_file(bucket_name,
89+
destination_file=file,
90+
source_file=file)
91+
92+
# Custom processing method that saves locally the minio file
93+
def test_processingMethod(file_chunk):
94+
fichero_procesado = open(out_file_name, "ab")
95+
fichero_procesado.write(file_chunk)
96+
fichero_procesado.close()
97+
98+
class obectStat:
99+
size = 9
100+
101+
mocked_return = obectStat()
102+
with mock.patch('pytest_minio_mock.plugin.MockMinioObject.stat_object', return_value=mocked_return) as irrelevant:
103+
minio_manager.process_file(bucket_name,
104+
file=file,
105+
chunk_size=9,
106+
processing_method=test_processingMethod)
107+
108+
# Reads the out file
109+
out_file = open(out_file_name, "r")
110+
result = out_file.read()
111+
out_file.close()
112+
113+
# Remove the created files
114+
os.remove(file)
115+
os.remove(out_file_name)
116+
# Check the downloaded file content is equal to the uploaded one
117+
assert result == "Test text"

0 commit comments

Comments
 (0)