Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 9 additions & 76 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,104 +7,36 @@ on:

jobs:
test:
runs-on: ubuntu-20.04
runs-on: ubuntu-latest

strategy:
fail-fast: false

matrix:
python-version: ["2.7", "3.6", "3.7", "3.8", "3.9", "3.10", "3.11"]
django-version: ["1.11", "2.0", "2.2", "2.1", "3.0", "3.1","3.2", "4.0", "4.1",]
python-version: ["3.8", "3.9", "3.10", "3.11"]
django-version: ["3.2", "4.1", "4.2"]
es-dsl-version: ["6.4", "7.4"]
es-version: ["7.13.4"]
es-version: ["8.10.2"]

exclude:
- python-version: "2.7"
django-version: "2.0"
- python-version: "2.7"
django-version: "2.1"
- python-version: "2.7"
django-version: "2.2"
- python-version: "2.7"
django-version: "3.0"
- python-version: "2.7"
django-version: "3.1"
- python-version: "2.7"
django-version: "3.2"
- python-version: "2.7"
django-version: "4.0"
- python-version: "2.7"
django-version: "4.1"

- python-version: "3.6"
django-version: "4.0"
- python-version: "3.6"
django-version: "4.1"

- python-version: "3.7"
django-version: "4.0"
- python-version: "3.7"
django-version: "4.1"

- python-version: "3.8"
django-version: "1.11"
- python-version: "3.8"
django-version: "2.0"
- python-version: "3.8"
django-version: "2.1"

- python-version: "3.9"
django-version: "1.11"
- python-version: "3.9"
django-version: "2.0"
- python-version: "3.9"
django-version: "2.1"

- python-version: "3.10"
django-version: "1.11"
- python-version: "3.10"
django-version: "2.0"
- python-version: "3.10"
django-version: "2.1"
- python-version: "3.10"
django-version: "2.2"
- python-version: "3.10"
django-version: "3.0"
- python-version: "3.10"
django-version: "3.1"

- python-version: "3.11"
django-version: "1.11"
- python-version: "3.11"
django-version: "2.0"
- python-version: "3.11"
django-version: "2.1"
- python-version: "3.11"
django-version: "2.2"
- python-version: "3.11"
django-version: "3.0"
- python-version: "3.11"
django-version: "3.1"
- python-version: "3.11"
django-version: "3.2"
- python-version: "3.11"
django-version: "4.0"

steps:
- name: Install and Run Elasticsearch
uses: elastic/elastic-github-actions/elasticsearch@master
with:
stack-version: ${{ matrix.es-version }}

- uses: actions/checkout@v2
- uses: actions/checkout@v4

- name: Install Python ${{ matrix.python-version }}
uses: actions/setup-python@v2
uses: actions/setup-python@v4
with:
python-version: ${{ matrix.python-version }}

- name: Cache Pip Dependencies
uses: actions/cache@v2
uses: actions/cache@v3
with:
path: ~/.cache/pip
key: ${{ runner.os }}-pip-${{ hashFiles('requirements_test.txt') }}
Expand All @@ -122,6 +54,7 @@ jobs:
run: |
TOX_ENV=$(echo "py${{ matrix.python-version }}-django-${{ matrix.django-version }}-es${{ matrix.es-dsl-version }}" | tr -d .)
python -m tox -e $TOX_ENV -- --elasticsearch
python -m tox -e $TOX_ENV -- --elasticsearch --signal-processor celery

- name: Publish Coverage Report
uses: codecov/codecov-action@v1
uses: codecov/codecov-action@v3
14 changes: 7 additions & 7 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -30,28 +30,28 @@ Features
- Index fast using `parallel` indexing.
- Requirements

- Django >= 1.11
- Python 2.7, 3.5, 3.6, 3.7, 3.8
- Django >= 3.2
- Python 3.8, 3.9, 3.10, 3.11

**Elasticsearch Compatibility:**
The library is compatible with all Elasticsearch versions since 5.x
**but you have to use a matching major version:**

- For Elasticsearch 8.0 and later, use the major version 8 (8.x.y) of the library.

- For Elasticsearch 7.0 and later, use the major version 7 (7.x.y) of the library.

- For Elasticsearch 6.0 and later, use the major version 6 (6.x.y) of the library.

- For Elasticsearch 5.0 and later, use the major version 0.5 (0.5.x) of the library.

.. code-block:: python

# Elasticsearch 8.x
elasticsearch-dsl>=8.0.0,<9.0.0

# Elasticsearch 7.x
elasticsearch-dsl>=7.0.0,<8.0.0

# Elasticsearch 6.x
elasticsearch-dsl>=6.0.0,<7.0.0

# Elasticsearch 5.x
elasticsearch-dsl>=0.5.1,<6.0.0

.. _Search: http://elasticsearch-dsl.readthedocs.io/en/stable/search_dsl.html
2 changes: 0 additions & 2 deletions django_elasticsearch_dsl/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,3 @@ def autodiscover():
autodiscover_modules('documents')


if django.VERSION < (3, 2):
default_app_config = 'django_elasticsearch_dsl.apps.DEDConfig'
7 changes: 7 additions & 0 deletions django_elasticsearch_dsl/documents.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,13 @@ def _get_actions(self, object_list, action):
for object_instance in object_list:
if action == 'delete' or self.should_index_object(object_instance):
yield self._prepare_action(object_instance, action)

def get_actions(self, object_list, action):
"""
Generate the elasticsearch payload.
"""
return self._get_actions(object_list, action)


def _bulk(self, *args, **kwargs):
"""Helper for switching between normal and parallel bulk operation"""
Expand Down
6 changes: 3 additions & 3 deletions django_elasticsearch_dsl/management/commands/search_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ def _delete_alias_indices(self, alias):
alias_delete_actions = [
{"remove_index": {"index": index}} for index in alias_indices
]
self.es_conn.indices.update_aliases({"actions": alias_delete_actions})
self.es_conn.indices.update_aliases(actions=alias_delete_actions)
for index in alias_indices:
self.stdout.write("Deleted index '{}'".format(index))

Expand Down Expand Up @@ -231,7 +231,7 @@ def _update_alias(self, alias, new_index, alias_exists, options):
{"remove_index": {"index": index}} for index in old_indices
]

self.es_conn.indices.update_aliases({"actions": alias_actions})
self.es_conn.indices.update_aliases(actions=alias_actions)
if delete_existing_index:
self.stdout.write("Deleted index '{}'".format(alias))

Expand All @@ -247,7 +247,7 @@ def _update_alias(self, alias, new_index, alias_exists, options):

if alias_delete_actions and not options['use_alias_keep_index']:
self.es_conn.indices.update_aliases(
{"actions": alias_delete_actions}
actions=alias_delete_actions
)
for index in old_indices:
self.stdout.write("Deleted index '{}'".format(index))
Expand Down
6 changes: 6 additions & 0 deletions django_elasticsearch_dsl/registries.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,5 +174,11 @@ def get_indices(self, models=None):

return set(iterkeys(self._indices))

def __contains__(self, model):
"""
Checks that model is in registry
"""
return model in self._models or model in self._related_models


registry = DocumentRegistry()
129 changes: 125 additions & 4 deletions django_elasticsearch_dsl/signals.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,13 @@
from __future__ import absolute_import

from django.db import models
from django.apps import apps
from django.dispatch import Signal

from .registries import registry

from django.core.exceptions import ObjectDoesNotExist
from importlib import import_module
# Sent after document indexing is completed
post_index = Signal()

class BaseSignalProcessor(object):
"""Base signal processor.
Expand Down Expand Up @@ -96,6 +99,124 @@ def teardown(self):
models.signals.m2m_changed.disconnect(self.handle_m2m_changed)
models.signals.pre_delete.disconnect(self.handle_pre_delete)

try:
from celery import shared_task
except ImportError:
pass
else:
class CelerySignalProcessor(RealTimeSignalProcessor):
"""Celery signal processor.

Allows automatic updates on the index as delayed background tasks using
Celery.

NB: We cannot process deletes as background tasks.
By the time the Celery worker would pick up the delete job, the
model instance would already deleted. We can get around this by
setting Celery to use `pickle` and sending the object to the worker,
but using `pickle` opens the application up to security concerns.
"""

# Sent after document indexing is completed
post_index = Signal()
def handle_save(self, sender, instance, **kwargs):
"""Handle save with a Celery task.

Given an individual model instance, update the object in the index.
Update the related objects either.
"""
pk = instance.pk
app_label = instance._meta.app_label
model_name = instance.__class__.__name__

self.registry_update_task.delay(pk, app_label, model_name)
self.registry_update_related_task.delay(pk, app_label, model_name)

def handle_pre_delete(self, sender, instance, **kwargs):
"""Handle removing of instance object from related models instance.
We need to do this before the real delete otherwise the relation
doesn't exists anymore and we can't get the related models instance.
"""
self.prepare_registry_delete_related_task(instance)

def handle_delete(self, sender, instance, **kwargs):
"""Handle delete.

Given an individual model instance, delete the object from index.
"""
self.prepare_registry_delete_task(instance)

def prepare_registry_delete_related_task(self, instance):
"""
Select its related instance before this instance was deleted.
And pass that to celery.
"""
action = 'index'
for doc in registry._get_related_doc(instance):
doc_instance = doc(related_instance_to_ignore=instance)
try:
related = doc_instance.get_instances_from_related(instance)
except ObjectDoesNotExist:
related = None
if related is not None:
doc_instance.update(related)
if isinstance(related, models.Model):
object_list = [related]
else:
object_list = related
bulk_data = list(doc_instance._get_actions(object_list, action)),
self.registry_delete_task.delay(doc_instance.__class__.__name__, bulk_data)

@shared_task()
def registry_delete_task(doc_label, data):
"""
Handle the bulk delete data on the registry as a Celery task.
The different implementations used are due to the difference between delete and update operations.
The update operation can re-read the updated data from the database to ensure eventual consistency,
but the delete needs to be processed before the database record is deleted to obtain the associated data.
"""
doc_instance = import_module(doc_label)
parallel = True
doc_instance._bulk(bulk_data, parallel=parallel)

def prepare_registry_delete_task(self, instance):
"""
Get the prepare did before database record deleted.
"""
action = 'delete'
for doc in registry._get_related_doc(instance):
doc_instance = doc(related_instance_to_ignore=instance)
try:
related = doc_instance.get_instances_from_related(instance)
except ObjectDoesNotExist:
related = None
if related is not None:
doc_instance.update(related)
if isinstance(related, models.Model):
object_list = [related]
else:
object_list = related
bulk_data = list(doc_instance.get_actions(object_list, action)),
self.registry_delete_task.delay(doc_instance.__class__.__name__, bulk_data)

@shared_task()
def registry_update_task(pk, app_label, model_name):
"""Handle the update on the registry as a Celery task."""
try:
model = apps.get_model(app_label, model_name)
except LookupError:
pass
else:
registry.update(
model.objects.get(pk=pk)
)

@shared_task()
def registry_update_related_task(pk, app_label, model_name):
"""Handle the related update on the registry as a Celery task."""
try:
model = apps.get_model(app_label, model_name)
except LookupError:
pass
else:
registry.update_related(
model.objects.get(pk=pk)
)
2 changes: 1 addition & 1 deletion docs/source/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@

# Example configuration for intersphinx: refer to the Python standard library.
intersphinx_mapping = {
'': ('https://docs.python.org/', None),
'python': ('https://docs.python.org/', None),
'es-py': ('https://elasticsearch-py.readthedocs.io/en/master/', None) ,
'es-dsl': ('https://elasticsearch-dsl.readthedocs.io/en/latest/', None),
}
5 changes: 3 additions & 2 deletions docs/source/quickstart.rst
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@ For example:

ELASTICSEARCH_DSL={
'default': {
'hosts': 'localhost:9200'
},
'hosts': 'localhost:9200',
'http_auth': ('username', 'password')
}
}

``ELASTICSEARCH_DSL`` is then passed to ``elasticsearch-dsl-py.connections.configure`` (see here_).
Expand Down
11 changes: 9 additions & 2 deletions docs/source/settings.rst
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,15 @@ An example:

Defaults to ``django_elasticsearch_dsl.signals.RealTimeSignalProcessor``.

You could, for instance, make a ``CelerySignalProcessor`` which would add
update jobs to the queue to for delayed processing.
Options: ``django_elasticsearch_dsl.signals.RealTimeSignalProcessor`` \ ``django_elasticsearch_dsl.signals.CelerySignalProcessor``

In this ``CelerySignalProcessor`` implementation,
Create and update operations will record the updated data primary key from the database and delay the time to find the association to ensure eventual consistency.
Delete operations are processed to obtain associated data before database records are deleted.
And celery needs to be pre-configured in the django project, for example `Using Celery with Django <https://docs.celeryq.dev/en/stable/django/first-steps-with-django.html>`.

You could, for instance, make a ``CustomSignalProcessor`` which would apply
update jobs as your wish.

ELASTICSEARCH_DSL_PARALLEL
==========================
Expand Down
Loading
Loading