Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow draft content to be returned #182

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 40 additions & 38 deletions wagtailautocomplete/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,20 @@
from django.contrib.contenttypes.models import ContentType
from django.core.exceptions import ValidationError
from django.db.models import Model, QuerySet
from django.http import (HttpResponseBadRequest, HttpResponseForbidden,
HttpResponseNotFound, JsonResponse)
from django.http import (
HttpResponseBadRequest,
HttpResponseForbidden,
HttpResponseNotFound,
JsonResponse,
)
from django.views.decorators.http import require_GET, require_POST


def render_page(page):
if getattr(page, 'specific', None):
if getattr(page, "specific", None):
# For support of non-Page models like Snippets.
page = page.specific
if callable(getattr(page, 'autocomplete_label', None)):
if callable(getattr(page, "autocomplete_label", None)):
title = page.autocomplete_label()
else:
title = page.title
Expand All @@ -23,63 +27,64 @@ def render_page(page):

@require_GET
def objects(request):
pks_param = request.GET.get('pks')
pks_param = request.GET.get("pks")
if not pks_param:
return HttpResponseBadRequest()
target_model = request.GET.get('type', 'wagtailcore.Page')
target_model = request.GET.get("type", "wagtailcore.Page")
try:
model = apps.get_model(target_model)
except Exception:
return HttpResponseBadRequest()

try:
pks = [
unquote(pk)
for pk in pks_param.split(',')
]
queryset = model.objects.filter(pk__in=pks)
pks = [unquote(pk) for pk in pks_param.split(",")]
if callable(getattr(model, "autocomplete_custom_queryset_objects", None)):
queryset = model.autocomplete_custom_queryset_objects()
validate_queryset(queryset, model)
else:
queryset = model.objects.filter(pk__in=pks)
if getattr(queryset, "live", None):
# Non-Page models like Snippets won't have a live/published status
# and thus should not be filtered with a call to `live`.
queryset = queryset.live()

except Exception:
return HttpResponseBadRequest()

if getattr(queryset, 'live', None):
# Non-Page models like Snippets won't have a live/published status
# and thus should not be filtered with a call to `live`.
queryset = queryset.live()

if queryset.count() != len(pks):
return HttpResponseNotFound('Some objects are either missing or deleted')
return HttpResponseNotFound("Some objects are either missing or deleted")
results = map(render_page, queryset)
return JsonResponse(dict(items=list(results)))


@require_POST
def search(request):
search_query = request.POST.get('query', '')
target_model = request.POST.get('type', 'wagtailcore.Page')
search_query = request.POST.get("query", "")
target_model = request.POST.get("type", "wagtailcore.Page")
try:
model = apps.get_model(target_model)
except Exception:
return HttpResponseBadRequest()

try:
limit = int(request.POST.get('limit', 100))
limit = int(request.POST.get("limit", 100))
except ValueError:
return HttpResponseBadRequest()

if callable(getattr(model, 'autocomplete_custom_queryset_filter', None)):
if callable(getattr(model, "autocomplete_custom_queryset_filter", None)):
queryset = model.autocomplete_custom_queryset_filter(search_query)
validate_queryset(queryset, model)
else:
queryset = filter_queryset(search_query, model)

if getattr(queryset, 'live', None):
# Non-Page models like Snippets won't have a live/published status
# and thus should not be filtered with a call to `live`.
queryset = queryset.live()
if getattr(queryset, "live", None):
# Non-Page models like Snippets won't have a live/published status
# and thus should not be filtered with a call to `live`.
queryset = queryset.live()

exclude = request.POST.get('exclude', '')
exclude = request.POST.get("exclude", "")
if exclude:
exclusions = [unquote(item) for item in exclude.split(',') if item]
exclusions = [unquote(item) for item in exclude.split(",") if item]
queryset = queryset.exclude(pk__in=exclusions)

results = map(render_page, queryset[:limit])
Expand All @@ -99,9 +104,9 @@ def filter_queryset(search_query: str, model: Model) -> QuerySet:
Returns:
QuerySet: QuerySet containing the search results.
"""
field_name = getattr(model, 'autocomplete_search_field', 'title')
field_name = getattr(model, "autocomplete_search_field", "title")
filter_kwargs = dict()
filter_kwargs[field_name + '__icontains'] = search_query
filter_kwargs[field_name + "__icontains"] = search_query
return model.objects.filter(**filter_kwargs)


Expand All @@ -122,37 +127,34 @@ def validate_queryset(queryset: QuerySet, model: Model):
if not isinstance(queryset, QuerySet):
raise TypeError(
f'Function "autocomplete_custom_queryset_filter" of model {model}'
'does not return a QuerySet.'
"does not return a QuerySet."
)

if queryset.model is not model:
raise TypeError(
f'Function "autocomplete_custom_queryset_filter" of model {model}'
'does not return queryset of {model}.'
"does not return queryset of {model}."
)


@require_POST
def create(request, *args, **kwargs):
value = request.POST.get('value', None)
value = request.POST.get("value", None)
if not value:
return HttpResponseBadRequest()

target_model = request.POST.get('type', 'wagtailcore.Page')
target_model = request.POST.get("type", "wagtailcore.Page")
try:
model = apps.get_model(target_model)
except Exception:
return HttpResponseBadRequest()

content_type = ContentType.objects.get_for_model(model)
permission_label = '{}.add_{}'.format(
content_type.app_label,
content_type.model
)
permission_label = "{}.add_{}".format(content_type.app_label, content_type.model)
if not request.user.has_perm(permission_label):
return HttpResponseForbidden()

method = getattr(model, 'autocomplete_create', None)
method = getattr(model, "autocomplete_create", None)
if not callable(method):
return HttpResponseBadRequest()

Expand Down