From f2e11c37565994ee6e3b8e5db04006b095127d52 Mon Sep 17 00:00:00 2001 From: malteos Date: Wed, 19 Nov 2025 12:33:56 +0100 Subject: [PATCH 1/5] feat: Adding settings, utils, and writer close --- cdx_toolkit/cli.py | 53 +++++++-------------------------------- cdx_toolkit/myrequests.py | 27 +++++++++++++++----- cdx_toolkit/settings.py | 15 +++++++++++ cdx_toolkit/utils.py | 49 ++++++++++++++++++++++++++++++++++++ examples/iter-and-warc.py | 2 ++ 5 files changed, 96 insertions(+), 50 deletions(-) create mode 100644 cdx_toolkit/settings.py create mode 100644 cdx_toolkit/utils.py diff --git a/cdx_toolkit/cli.py b/cdx_toolkit/cli.py index bfe7e23..494edbd 100644 --- a/cdx_toolkit/cli.py +++ b/cdx_toolkit/cli.py @@ -6,7 +6,9 @@ import os import cdx_toolkit -from cdx_toolkit.commoncrawl import normalize_crawl + +from cdx_toolkit.utils import get_version, setup + LOGGER = logging.getLogger(__name__) @@ -151,49 +153,6 @@ def set_loglevel(cmd): LOGGER.info('set loglevel to %s', str(loglevel)) -def get_version(): - return cdx_toolkit.__version__ - - -def setup(cmd): - kwargs = {} - kwargs['source'] = 'cc' if cmd.crawl else cmd.cc or cmd.ia or cmd.source or None - if kwargs['source'] is None: - raise ValueError('must specify --cc, --ia, or a --source') - if cmd.wb: - kwargs['wb'] = cmd.wb - if cmd.cc_mirror: - kwargs['cc_mirror'] = cmd.cc_mirror - if cmd.crawl: - kwargs['crawl'] = normalize_crawl([cmd.crawl]) # currently a string, not a list - if getattr(cmd, 'warc_download_prefix', None) is not None: - kwargs['warc_download_prefix'] = cmd.warc_download_prefix - - cdx = cdx_toolkit.CDXFetcher(**kwargs) - - kwargs = {} - if cmd.limit: - kwargs['limit'] = cmd.limit - if 'from' in vars(cmd) and vars(cmd)['from']: # python, uh, from is a reserved word - kwargs['from_ts'] = vars(cmd)['from'] - if cmd.to: - kwargs['to'] = cmd.to - if cmd.closest: - if not cmd.get: # pragma: no cover - LOGGER.info('note: --closest works best with --get') - kwargs['closest'] = cmd.closest - if cmd.filter: - kwargs['filter'] = cmd.filter - - if cmd.cmd == 'warc' and cmd.size: - kwargs['size'] = cmd.size - - if cmd.cmd == 'size' and cmd.details: - kwargs['details'] = cmd.details - - return cdx, kwargs - - def winnow_fields(cmd, fields, obj): if cmd.all_fields: printme = obj @@ -275,9 +234,15 @@ def warcer(cmd, cmdline): LOGGER.warning('revisit record being resolved for url %s %s', url, timestamp) writer.write_record(record) + writer.close() + def sizer(cmd, cmdline): cdx, kwargs = setup(cmd) size = cdx.get_size_estimate(cmd.url, **kwargs) print(size) + + +if __name__ == "__main__": + main() diff --git a/cdx_toolkit/myrequests.py b/cdx_toolkit/myrequests.py index 408bebf..6e8d684 100644 --- a/cdx_toolkit/myrequests.py +++ b/cdx_toolkit/myrequests.py @@ -1,9 +1,18 @@ +from typing import Optional import requests import logging import time from urllib.parse import urlparse from . import __version__ +from .settings import ( + DEFAULT_MIN_RETRY_INTERVAL, + CC_DATA_MIN_RETRY_INTERVAL, + CC_INDEX_MIN_RETRY_INTERVAL, + IA_MIN_RETRY_INTERVAL, + MAX_ERRORS, + WARNING_AFTER_N_ERRORS, +) LOGGER = logging.getLogger(__name__) @@ -23,19 +32,19 @@ def dns_fatal(hostname): retry_info = { 'default': { 'next_fetch': 0, - 'minimum_interval': 3.0, + 'minimum_interval': DEFAULT_MIN_RETRY_INTERVAL, }, 'index.commoncrawl.org': { 'next_fetch': 0, - 'minimum_interval': 1.0, + 'minimum_interval': CC_INDEX_MIN_RETRY_INTERVAL, }, 'data.commoncrawl.org': { 'next_fetch': 0, - 'minimum_interval': 0.55, + 'minimum_interval': CC_DATA_MIN_RETRY_INTERVAL, }, 'web.archive.org': { 'next_fetch': 0, - 'minimum_interval': 6.0, + 'minimum_interval': IA_MIN_RETRY_INTERVAL, }, } @@ -60,12 +69,18 @@ def myrequests_get( headers=None, cdx=False, allow404=False, - raise_error_after_n_errors: int = 100, - raise_warning_after_n_errors: int = 10, + raise_error_after_n_errors: Optional[int] = None, + raise_warning_after_n_errors: Optional[int] = None, retry_max_sec: int = 60, ): t = time.time() + if raise_error_after_n_errors is None: + raise_error_after_n_errors = MAX_ERRORS + + if raise_warning_after_n_errors is None: + raise_warning_after_n_errors = WARNING_AFTER_N_ERRORS + hostname = urlparse(url).hostname next_fetch, minimum_interval = get_retries(hostname) diff --git a/cdx_toolkit/settings.py b/cdx_toolkit/settings.py new file mode 100644 index 0000000..f223f65 --- /dev/null +++ b/cdx_toolkit/settings.py @@ -0,0 +1,15 @@ +import os + +MAX_ERRORS = int(os.environ.get('CDXT_MAX_ERRORS', 100)) +WARNING_AFTER_N_ERRORS = int(os.environ.get('CDXT_WARNING_AFTER_N_ERRORS', 10)) + +DEFAULT_MIN_RETRY_INTERVAL = float(os.environ.get('CDXT_DEFAULT_MIN_RETRY_INTERVAL', 3.0)) +CC_INDEX_MIN_RETRY_INTERVAL = float(os.environ.get('CDXT_CC_INDEX_MIN_RETRY_INTERVAL', 1.0)) +CC_DATA_MIN_RETRY_INTERVAL = float(os.environ.get('CDXT_CC_DATA_MIN_RETRY_INTERVAL', 0.55)) +IA_MIN_RETRY_INTERVAL = float(os.environ.get('CDXT_IA_MIN_RETRY_INTERVAL', 6.0)) + + +def get_mock_time(): + """Get the mock time from environment variable, evaluated dynamically""" + mock_time = os.environ.get('CDXT_MOCK_TIME') + return float(mock_time) if mock_time else None diff --git a/cdx_toolkit/utils.py b/cdx_toolkit/utils.py new file mode 100644 index 0000000..55f0d20 --- /dev/null +++ b/cdx_toolkit/utils.py @@ -0,0 +1,49 @@ +import cdx_toolkit +from cdx_toolkit.commoncrawl import normalize_crawl + +import logging + +LOGGER = logging.getLogger(__name__) + + +def get_version(): + return cdx_toolkit.__version__ + + +def setup(cmd): + kwargs = {} + kwargs['source'] = 'cc' if cmd.crawl else cmd.cc or cmd.ia or cmd.source or None + if kwargs['source'] is None: + raise ValueError('must specify --cc, --ia, or a --source') + if cmd.wb: + kwargs['wb'] = cmd.wb + if cmd.cc_mirror: + kwargs['cc_mirror'] = cmd.cc_mirror + if cmd.crawl: + kwargs['crawl'] = normalize_crawl([cmd.crawl]) # currently a string, not a list + if getattr(cmd, 'warc_download_prefix', None) is not None: + kwargs['warc_download_prefix'] = cmd.warc_download_prefix + + cdx = cdx_toolkit.CDXFetcher(**kwargs) + + kwargs = {} + if cmd.limit: + kwargs['limit'] = cmd.limit + if 'from' in vars(cmd) and vars(cmd)['from']: # python, uh, from is a reserved word + kwargs['from_ts'] = vars(cmd)['from'] + if cmd.to: + kwargs['to'] = cmd.to + if cmd.closest: + if not cmd.get: # pragma: no cover + LOGGER.info('note: --closest works best with --get') + kwargs['closest'] = cmd.closest + if cmd.filter: + kwargs['filter'] = cmd.filter + + if cmd.cmd == 'warc' and cmd.size: + kwargs['size'] = cmd.size + + if cmd.cmd == 'size' and cmd.details: + kwargs['details'] = cmd.details + + return cdx, kwargs diff --git a/examples/iter-and-warc.py b/examples/iter-and-warc.py index 73ea3dd..61de9c0 100755 --- a/examples/iter-and-warc.py +++ b/examples/iter-and-warc.py @@ -32,3 +32,5 @@ writer.write_record(record) print(' wrote', url) + +writer.close() From 94cc51bfc4d9391091b2486cf4c8ffbe3c49e240 Mon Sep 17 00:00:00 2001 From: malteos Date: Wed, 19 Nov 2025 12:58:02 +0100 Subject: [PATCH 2/5] Adding S3 writer integration --- cdx_toolkit/warc.py | 82 +++++++++++++++++++++++++++++---------------- requirements.txt | 18 +++++----- scripts/cdx_iter | 2 ++ setup.py | 12 +++---- 4 files changed, 69 insertions(+), 45 deletions(-) diff --git a/cdx_toolkit/warc.py b/cdx_toolkit/warc.py index e7e2bd5..657b2c0 100644 --- a/cdx_toolkit/warc.py +++ b/cdx_toolkit/warc.py @@ -1,10 +1,10 @@ from urllib.parse import quote from io import BytesIO -import os.path import datetime import logging import sys +import fsspec from warcio import WARCWriter from warcio.recordloader import ArcWarcRecordLoader from warcio.bufferedreaders import DecompressingBufferedReader @@ -32,9 +32,9 @@ def wb_redir_to_original(location): def fake_wb_warc(url, wb_url, resp, capture): - ''' + """ Given a playback from a wayback, fake up a warc response record - ''' + """ status_code = resp.status_code status_reason = resp.reason @@ -42,19 +42,18 @@ def fake_wb_warc(url, wb_url, resp, capture): url = capture['url'] timestamp = capture['timestamp'] if status_code == 200 and capture['status'] == '-': - LOGGER.warning('revisit record vivified by wayback for %s %s', - url, timestamp) + LOGGER.warning('revisit record vivified by wayback for %s %s', url, timestamp) elif status_code == 200 and capture['status'].startswith('3'): - LOGGER.warning('redirect capture came back 200, same-surt same-timestamp capture? %s %s', - url, timestamp) + LOGGER.warning('redirect capture came back 200, same-surt same-timestamp capture? %s %s', url, timestamp) elif status_code == 302 and capture['status'].startswith('3'): # this is OK, wayback always sends a temporary redir status_code = int(capture['status']) if status_code != resp.status_code and status_code in http_status_text: status_reason = http_status_text[status_code] else: # pragma: no cover - LOGGER.warning('surprised that status code is now=%d orig=%s %s %s', - status_code, capture['status'], url, timestamp) + LOGGER.warning( + 'surprised that status code is now=%d orig=%s %s %s', status_code, capture['status'], url, timestamp + ) http_headers = [] http_date = None @@ -89,16 +88,15 @@ def fake_wb_warc(url, wb_url, resp, capture): content_bytes = resp.content writer = WARCWriter(None) # needs warc_version here? - return writer.create_warc_record(url, 'response', - payload=BytesIO(content_bytes), - http_headers=http_headers, - warc_headers_dict=warc_headers_dict) + return writer.create_warc_record( + url, 'response', payload=BytesIO(content_bytes), http_headers=http_headers, warc_headers_dict=warc_headers_dict + ) def fetch_wb_warc(capture, wb, modifier='id_'): for field in ('url', 'timestamp', 'status'): if field not in capture: # pragma: no cover - raise ValueError('capture must contain '+field) + raise ValueError('capture must contain ' + field) if wb is None: # pragma: no cover raise ValueError('No wayback configured') @@ -123,7 +121,7 @@ def fetch_wb_warc(capture, wb, modifier='id_'): def fetch_warc_record(capture, warc_download_prefix): for field in ('url', 'filename', 'offset', 'length'): if field not in capture: # pragma: no cover - raise ValueError('capture must contain '+field) + raise ValueError('capture must contain ' + field) url = capture['url'] filename = capture['filename'] @@ -131,10 +129,19 @@ def fetch_warc_record(capture, warc_download_prefix): length = int(capture['length']) warc_url = warc_download_prefix + '/' + filename - headers = {'Range': 'bytes={}-{}'.format(offset, offset+length-1)} - resp = myrequests_get(warc_url, headers=headers) - record_bytes = resp.content + if warc_url.startswith('s3:'): + # fetch from S3 + with fsspec.open(warc_url, 'rb') as f: + f.seek(offset) + record_bytes = f.read(length) + else: + # fetch over HTTP + headers = {'Range': 'bytes={}-{}'.format(offset, offset + length - 1)} + + resp = myrequests_get(warc_url, headers=headers) + record_bytes = resp.content + stream = DecompressingBufferedReader(BytesIO(record_bytes)) record = ArcWarcRecordLoader().parse_record_stream(stream) @@ -145,18 +152,20 @@ def fetch_warc_record(capture, warc_download_prefix): warc_target_uri = record.rec_headers.get_header('WARC-Target-URI') if url != warc_target_uri: # pragma: no cover print( - "Surprised that WARC-Target-URI {} is not the capture url {}".format( - warc_target_uri, url - ), + 'Surprised that WARC-Target-URI {} is not the capture url {}'.format(warc_target_uri, url), file=sys.stderr, ) record.rec_headers.replace_header('WARC-Source-URI', warc_url) - record.rec_headers.replace_header('WARC-Source-Range', 'bytes={}-{}'.format(offset, offset+length-1)) + record.rec_headers.replace_header('WARC-Source-Range', 'bytes={}-{}'.format(offset, offset + length - 1)) return record class CDXToolkitWARCWriter: + """Writer for WARC files. + + The fsspec package is used for writting to local or remote file system, e.g., S3.""" + def __init__(self, prefix, subprefix, info, size=1000000000, gzip=True, warc_version=None): self.prefix = prefix self.subprefix = subprefix @@ -166,6 +175,9 @@ def __init__(self, prefix, subprefix, info, size=1000000000, gzip=True, warc_ver self.warc_version = warc_version self.segment = 0 self.writer = None + self.file_handler = None + self.file_system, self.file_system_prefix = fsspec.url_to_fs(self.prefix) + self._file_context = None def write_record(self, *args, **kwargs): if self.writer is None: @@ -180,21 +192,21 @@ def write_record(self, *args, **kwargs): self.writer.write_record(*args, **kwargs) - fsize = os.fstat(self.fd.fileno()).st_size - if fsize > self.size: - self.fd.close() + # Compare file size of current segment with max. file size + if self.file_handler and self.file_handler.tell() > self.size: + self._close_current_file() self.writer = None self.segment += 1 def _unique_warc_filename(self): while True: - name = self.prefix + '-' + name = self.file_system_prefix + '-' if self.subprefix is not None: name += self.subprefix + '-' name += '{:06d}'.format(self.segment) + '.extracted.warc' if self.gzip: name += '.gz' - if os.path.exists(name): + if self.file_system.exists(name): self.segment += 1 else: break @@ -202,12 +214,24 @@ def _unique_warc_filename(self): def _start_new_warc(self): self.filename = self._unique_warc_filename() - self.fd = open(self.filename, 'wb') + self._file_context = self.file_system.open(self.filename, 'wb') + self.file_handler = self._file_context.__enter__() LOGGER.info('opening new warc file %s', self.filename) - self.writer = WARCWriter(self.fd, gzip=self.gzip, warc_version=self.warc_version) + self.writer = WARCWriter(self.file_handler, gzip=self.gzip, warc_version=self.warc_version) warcinfo = self.writer.create_warcinfo_record(self.filename, self.info) self.writer.write_record(warcinfo) + def _close_current_file(self): + # Close the handler of the current file (needed for fsspec abstraction) + if self._file_context is not None: + self._file_context.__exit__(None, None, None) + self._file_context = None + self.file_handler = None + + def close(self): + # Close the WARC writer (this must be called at the end) + self._close_current_file() + def get_writer(prefix, subprefix, info, **kwargs): return CDXToolkitWARCWriter(prefix, subprefix, info, **kwargs) diff --git a/requirements.txt b/requirements.txt index f74bc4c..97c1943 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,19 +1,19 @@ # Install with "python -m pip install -r requirements.txt". # must be kept in sync with setup.py -requests==2.25.1 +requests>=2.25.1 warcio==1.7.4 +fsspec[s3] # used by Makefile -pytest==6.2.4 -pytest-cov==2.12.1 -pytest-sugar==0.9.4 -coveralls==3.1.0 +pytest>=6.2.4 +pytest-cov>=2.12.1 +pytest-sugar>=0.9.4 +coveralls>=3.1.0 flake8>=7.3.0 -responses==0.25.8 pre-commit>=4.3.0 # packaging -twine==3.4.1 -setuptools==57.0.0 -setuptools-scm==6.0.1 +twine>=3.4.1 +setuptools>=57.0.0 +setuptools-scm>=6.0.1 diff --git a/scripts/cdx_iter b/scripts/cdx_iter index 8b0c5a3..99445c0 100644 --- a/scripts/cdx_iter +++ b/scripts/cdx_iter @@ -143,6 +143,8 @@ elif args.warc: if obj.is_revisit(): LOGGER.warning('revisit record being resolved for url %s %s', url, timestamp) writer.write_record(record) + + writer.close() else: for obj in cdx.iter(args.url, **kwargs): printme = winnow_fields(obj) diff --git a/setup.py b/setup.py index 9a03208..b7f630a 100755 --- a/setup.py +++ b/setup.py @@ -2,15 +2,13 @@ from os import path -from setuptools import setup +from setuptools import setup, find_packages -packages = [ - 'cdx_toolkit', -] +packages = find_packages(include=['cdx_toolkit*']) # remember: keep requires synchronized with requirements.txt -requires = ['requests', 'warcio'] +requires = ['requests', 'warcio', 'fsspec[s3]'] test_requirements = ['pytest', 'pytest-cov', 'flake8', 'responses'] @@ -61,8 +59,8 @@ 'Natural Language :: English', 'License :: OSI Approved :: Apache Software License', 'Programming Language :: Python', - #'Programming Language :: Python :: 3.5', # setuptools-scm problem - #'Programming Language :: Python :: 3.6', # not offered in github actions + # 'Programming Language :: Python :: 3.5', # setuptools-scm problem + # 'Programming Language :: Python :: 3.6', # not offered in github actions 'Programming Language :: Python :: 3.9', 'Programming Language :: Python :: 3.10', 'Programming Language :: Python :: 3.11', From 5793f1b2a3cb49ee6e5082c57e87c0cc8506aca9 Mon Sep 17 00:00:00 2001 From: malteos Date: Wed, 19 Nov 2025 13:00:47 +0100 Subject: [PATCH 3/5] fix bad merge --- requirements.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/requirements.txt b/requirements.txt index 97c1943..7ecf9ca 100644 --- a/requirements.txt +++ b/requirements.txt @@ -11,6 +11,7 @@ pytest-cov>=2.12.1 pytest-sugar>=0.9.4 coveralls>=3.1.0 flake8>=7.3.0 +responses==0.25.8 pre-commit>=4.3.0 # packaging From ed3061f62bb2556875ecb29f8c66fe2cc9b1388b Mon Sep 17 00:00:00 2001 From: malteos Date: Wed, 19 Nov 2025 15:09:06 +0100 Subject: [PATCH 4/5] Adding unit test for s3 reads --- .github/workflows/ci.yaml | 32 ++++++++++++++++++++++ cdx_toolkit/commoncrawl.py | 12 ++++++-- tests/conftest.py | 56 ++++++++++++++++++++++++++++++++++++++ tests/unit/test_warc.py | 19 +++++++++++++ 4 files changed, 116 insertions(+), 3 deletions(-) diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 13e12f0..58a7ebe 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -8,6 +8,12 @@ on: branches: - main +# These permissions are needed to interact with AWS S3 via GitHub's OIDC Token endpoint +permissions: + id-token: write + contents: read + pull-requests: read + jobs: unit-tests: runs-on: ${{ matrix.os }} @@ -58,6 +64,32 @@ jobs: - name: Install cdx_toolkit run: pip install .[test] + - name: Configure AWS credentials from OIDC (disabled for forks) + if: github.event.pull_request.head.repo.full_name == github.repository || github.event_name == 'push' + uses: aws-actions/configure-aws-credentials@v4 + with: + role-to-assume: arn:aws:iam::837454214164:role/GitHubActions-Role + aws-region: us-east-1 + + - name: Disable S3 unit tests for Python 3.8 (boto3 requires Python 3.9+) + if: ${{ startsWith(matrix.python-version, '3.8') }} + uses: actions/github-script@v7 + with: + script: | + core.exportVariable('CDXT_DISABLE_S3_TESTS', '1') + - name: Set environment variables for faster unit tests (requests are mocked) + uses: actions/github-script@v7 + with: + script: | + core.exportVariable('CDXT_MAX_ERRORS', '2') + core.exportVariable('CDXT_WARNING_AFTER_N_ERRORS', '2') + core.exportVariable('CDXT_DEFAULT_MIN_RETRY_INTERVAL', '0.01') + core.exportVariable('CDXT_CC_INDEX_MIN_RETRY_INTERVAL', '0.01') + core.exportVariable('CDXT_CC_DATA_MIN_RETRY_INTERVAL', '0.01') + core.exportVariable('CDXT_IA_MIN_RETRY_INTERVAL', '0.01') + core.exportVariable('DISABLE_ATHENA_TESTS', '1') + core.exportVariable('LOGLEVEL', 'DEBUG') + - name: Lint code run: | make lint diff --git a/cdx_toolkit/commoncrawl.py b/cdx_toolkit/commoncrawl.py index a604c41..7c38c24 100644 --- a/cdx_toolkit/commoncrawl.py +++ b/cdx_toolkit/commoncrawl.py @@ -9,6 +9,8 @@ import json import logging +from cdx_toolkit.settings import get_mock_time + from .myrequests import myrequests_get from .timeutils import ( time_to_timestamp, @@ -128,9 +130,13 @@ def apply_cc_defaults(params, crawl_present=False, now=None): LOGGER.info('to but no from_ts, setting from_ts=%s', params['from_ts']) else: if not now: - # now is passed in by tests. if not set, use actual now. - # XXX could be changed to mock - now = time.time() + # Check for test/override time first + mock_time = get_mock_time() + if mock_time: + now = mock_time + else: + # now is passed in by tests. if not set, use actual now. + now = time.time() params['from_ts'] = time_to_timestamp(now - year) LOGGER.info('no from or to, setting default 1 year ago from_ts=%s', params['from_ts']) else: diff --git a/tests/conftest.py b/tests/conftest.py index 1555c24..b85d9aa 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,16 +1,72 @@ import json import os from pathlib import Path +import pytest +import boto3 +from botocore.config import Config +from botocore.exceptions import NoCredentialsError, ClientError, EndpointConnectionError + import functools from typing import Dict, Optional import requests import responses import base64 +import shutil from unittest.mock import patch TEST_DATA_BASE_PATH = Path(__file__).parent / 'data' +TEST_S3_BUCKET = os.environ.get('CDXT_TEST_S3_BUCKET', 'commoncrawl-ci-temp') +DISABLE_S3_TESTS = bool(os.environ.get('CDXT_DISABLE_S3_TESTS', False)) + +# Cache for AWS access check to avoid repeated network calls +_aws_s3_access_cache = None + + +@pytest.fixture(scope='session', autouse=True) +def cleanup_cache(): + """Delete cache directory before each test to ensure clean state""" + cache_dir = os.path.expanduser('~/.cache/cdx_toolkit/') + if os.path.exists(cache_dir): + shutil.rmtree(cache_dir) + + +@pytest.fixture(scope='session', autouse=True) +def set_mock_time(): + """Set CDXT_MOCK_TIME environment variable for consistent test results""" + # August 15, 2025 - ensures tests use CC-MAIN-2025-33 which exists in mock data + if 'CDXT_MOCK_TIME' not in os.environ: + os.environ['CDXT_MOCK_TIME'] = '1755259200' + + +def check_aws_s3_access(): + """Check if AWS S3 access is available (cached result).""" + global _aws_s3_access_cache + + if _aws_s3_access_cache is not None: + return _aws_s3_access_cache + + try: + config = Config(retries={'max_attempts': 1, 'mode': 'standard'}) + s3_client = boto3.client('s3', config=config) + + # Try list objects on test bucket + s3_client.list_objects_v2(Bucket=TEST_S3_BUCKET, MaxKeys=1) + _aws_s3_access_cache = True + except (NoCredentialsError, ClientError, ConnectionError, EndpointConnectionError): + _aws_s3_access_cache = False + + return _aws_s3_access_cache + + +def requires_aws_s3(func): + """Pytest decorator that skips test if AWS S3 access is not available or disabled.""" + return pytest.mark.skipif(DISABLE_S3_TESTS, reason='AWS S3 access is disabled via environment variable.')( + pytest.mark.skipif( + not check_aws_s3_access(), reason='AWS S3 access not available (no credentials or permissions)' + )(func) + ) def flexible_param_matcher(expected_params): diff --git a/tests/unit/test_warc.py b/tests/unit/test_warc.py index e5df43f..661aea5 100644 --- a/tests/unit/test_warc.py +++ b/tests/unit/test_warc.py @@ -1,7 +1,26 @@ import cdx_toolkit.warc +from tests.conftest import requires_aws_s3 def test_wb_redir_to_original(): location = 'https://web.archive.org/web/20110209062054id_/http://commoncrawl.org/' ret = 'http://commoncrawl.org/' assert cdx_toolkit.warc.wb_redir_to_original(location) == ret + + +@requires_aws_s3 +def test_fetch_warc_record_from_s3(): + record = cdx_toolkit.warc.fetch_warc_record( + capture={ + 'url': 'https://bibliotheque.missiondefrance.fr/index.php?lvl=bulletin_display&id=319', + 'filename': 'crawl-data/CC-MAIN-2024-30/segments/1720763514759.37/warc/CC-MAIN-20240716142214-20240716172214-00337.warc.gz', # noqa: E501 + 'offset': 111440525, + 'length': 9754, + }, + warc_download_prefix='s3://commoncrawl', + ) + record_content = record.content_stream().read().decode(errors='ignore') + + assert record.rec_type == 'response' + assert record.length == 75825 + assert 'Catalogue en ligne Mission de France' in record_content From 4e660a3f85c944a4348230a5c06455ad4e1242d6 Mon Sep 17 00:00:00 2001 From: malteos Date: Wed, 19 Nov 2025 15:13:50 +0100 Subject: [PATCH 5/5] Adding boto3 dependency --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index b7f630a..5d5858e 100755 --- a/setup.py +++ b/setup.py @@ -8,7 +8,7 @@ packages = find_packages(include=['cdx_toolkit*']) # remember: keep requires synchronized with requirements.txt -requires = ['requests', 'warcio', 'fsspec[s3]'] +requires = ['requests', 'warcio', 'fsspec[s3]', 'boto3'] test_requirements = ['pytest', 'pytest-cov', 'flake8', 'responses']