Skip to content
Permalink

Comparing changes

This is a direct comparison between two commits made in this repository or its related repositories. View the default comparison for this range or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: OneGov/onegov-cloud
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: 5dee4742badc8ed324b0a0b449434b049dbc4839
Choose a base ref
..
head repository: OneGov/onegov-cloud
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: 254c37b0fd46f920eaf4e15a3f38489e5ea94701
Choose a head ref
Showing with 80 additions and 33 deletions.
  1. +52 −33 src/onegov/org/management.py
  2. +28 −0 tests/onegov/org/test_views_settings.py
85 changes: 52 additions & 33 deletions src/onegov/org/management.py
Original file line number Diff line number Diff line change
@@ -4,6 +4,7 @@

import transaction
from aiohttp import ClientTimeout
from sqlalchemy.ext.mutable import MutableDict
from sqlalchemy.orm import object_session
from sqlalchemy.ext.declarative import declared_attr
from urlextract import URLExtract
@@ -16,7 +17,7 @@
from onegov.people import AgencyCollection


from typing import Literal, NamedTuple, TYPE_CHECKING, Any, Callable
from typing import Literal, NamedTuple, TYPE_CHECKING, Iterator
if TYPE_CHECKING:
from collections.abc import Iterable, Sequence
from onegov.form import Form
@@ -76,7 +77,7 @@ def migrate_url(
group_by = group_by or item.__class__.__name__

def repl(matchobj: re.Match[str]) -> str:
# replaces it with a new URI.
# replaces it with a new URI.
if self.use_domain:
return f'{matchobj.group(1)}{new_uri}'
return new_uri
@@ -87,41 +88,60 @@ def repl(matchobj: re.Match[str]) -> str:
else:
pattern = re.compile(re.escape(old_uri))

def content_mixin_declared_attrs(
members: list[tuple['str', 'Any']]
) -> 'Iterable[tuple[str, Callable]]':
# don't be overly specific, we might add additional fields in
# the future
def predicate(member: tuple[str, Callable]) -> bool:
name, attr = member
return (
isinstance(attr, declared_attr)
and name in ContentMixin.__dict__
)
def predicate(attr) -> bool:
nonlocal item
return isinstance(attr, MutableDict)
# todo: can it be an iterable which contains MUtableDict?

return filter(predicate, members)
# todo: refactor away from getmembers()

# Migrate `meta` and `content`:
if isinstance(item, ContentMixin):
for name, attribute in content_mixin_declared_attrs(
getmembers(item, isfunction)
):
meta_or_content = getattr(item, name, None)
if meta_or_content is None:
try:
kv = getmembers(item, predicate)
except NotImplementedError:
print('notimplementeed')
cal = "calendar_date_range"
# go with pdb into predicate and check for
try:
kv = getmembers(item, predicate)
breakpoint()
except Exception:
breakpoint()

# if len(kv) > 2:
# breakpoint()
kv: list[tuple[str, MutableDict]]
for el in kv:
if not el:
continue
attribute_name = el[0]
if attribute_name not in ContentMixin.__dict__:
continue
# breakpoitn
for key, v in meta_or_content: ## key might be 'lead' 'text'
new_val = pattern.sub(repl, v)
if v != new_val:
occurrences = len(pattern.findall(v))
count += occurrences
id_count = count_by_id.setdefault(group_by, defaultdict(int))
id_count[meta_or_content] += occurrences

try:
item.name[key] = new_val
except AttributeError:
pass
if attribute_name == 'calendar_date_range':
breakpoint()

if len(el) != 2:
breakpoint()

if el[0] == 'content' or el[0] == 'meta' and el[1]:
content = el[1]
for key, v in content.items(): # key might be 'lead'
# 'text', 'people' etc.
if not isinstance(v, str) or not v:
continue
new_val = pattern.sub(repl, v)
if v != new_val:
breakpoint()
# get number of replacements so the count is
# correct
occurrences = len(pattern.findall(v))
count += occurrences

try:
item.name[key] = new_val
except AttributeError:
pass

for field in fields_with_urls:
value = getattr(item, field, None)
@@ -156,7 +176,6 @@ def migrate_site_collection(
for name, entries in self.site_collection.get().items():
for _ in entries:
simple_count += 1
breakpoint()

for name, entries in self.site_collection.get().items():
for entry in entries:
28 changes: 28 additions & 0 deletions tests/onegov/org/test_views_settings.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from onegov.api.models import ApiKey
from onegov.org.theme.org_theme import HELVETICA
from xml.etree.ElementTree import tostring
from onegov.org.models import SiteCollection


def test_settings(client):
@@ -143,3 +144,30 @@ def test_switch_languages(client):
assert 'Tedesco' in page
assert 'Allemand' not in page
assert 'Deutsch' not in page


def test_migrate_links(client):
session = client.app.session()

sitecollection = SiteCollection(session)
objects = sitecollection.get()

# pages = client.app.session().query(Topic).defer(Topic.)
# topics = session.query(Topic)
#
# topics = topics.options(defer(Topic.meta))
# topics = topics.options(defer(Topic.content))
# topics = topics.options(defer(Topic.order))

client.login_admin()
page = client.get('/migrate-links')
assert (
'Migriert Links von der angegebenen Domain zur aktuellen Domain '
'"localhost"' in page
)

# migrate from 'localhost' to 127.0.0.1
# (pointless but enough for test)
page.form['old_domain'] = '127.0.0.1'
page.form.submit()
page.showbrowser()