Implement new PLZ index

This commit is contained in:
Jannis Portmann 2024-02-10 12:18:29 +01:00
parent b8547cd877
commit 2f3de9b91e
6 changed files with 92 additions and 82 deletions

View file

@ -42,5 +42,5 @@ python manage.py getplzindex --force
To get started with development, see [DEVELOPMENT.md](DEVELOPMENT.md)
## Open Source Data
For calculating distances between zip codes, the `PLZ_Verzeichnis` is used.
Source: https://opendata.swiss/de/dataset/plz_verzeichnis
For calculating distances between zip codes, the `Official index of cities and towns including postal codes and perimeter ` is used.
Source: https://opendata.swiss/en/dataset/amtliches-ortschaftenverzeichnis-mit-postleitzahl-und-perimeter

View file

@ -3,7 +3,6 @@ python manage.py migrate
python manage.py loco
python manage.py makemessages -l en -l de
python manage.py compilemessages -f
python manage.py getplzindex
nginx

View file

@ -1,78 +0,0 @@
import json
import os
from urllib import request
import pandas as pd
from django.core.management.base import BaseCommand
from pflaenzli_django.settings import BASE_DIR
from pflaenzli.models import Plz
class Command(BaseCommand):
help = 'Get the zip code index from post and compile it to a dataframe pickle'
def add_arguments(self, parser):
parser.add_argument("--force", action="store_true", required=False)
def handle(self, *args, **options):
self.parse_data(*self.download_geojson(api='v2', data='v2', force=options["force"]), force=options["force"])
def download_geojson(self, api, data, force=False):
file = f'plz_verzeichnis_{data}.json'
if os.path.exists(file) and not force:
self.stdout.write('File already downloaded.')
self.stdout.write(self.style.SUCCESS(
'Skipping...\n'))
exists = True
else:
self.stdout.write('Downloading geojson...')
url = f'https://swisspost.opendatasoft.com/api/{api}/catalog/datasets/plz_verzeichnis_{data}/exports/geojson'
request.urlretrieve(url, file)
self.stdout.write(self.style.SUCCESS('Done!\n'))
exists = False
return file, exists
def parse_data(self, file, exists, force=False):
if exists and not force:
self.stdout.write(self.style.WARNING(
'Nothing was done, if you want to redownload the PLZ index, use the --force option.\n'))
return
self.stdout.write('Opening file...')
# Load the GeoJSON data for the zip codes
with open(file, encoding='UTF-8') as f:
full_data = json.load(f)
self.stdout.write(self.style.SUCCESS('Done!\n'))
self.stdout.write('Deleting existing data...')
Plz.objects.all().delete()
self.stdout.write(self.style.SUCCESS('Done!\n'))
self.stdout.write('Parsing file and add new data...')
for plz_entry in full_data['features']:
plz_entry = plz_entry['properties']
try:
plz = plz_entry['postleitzahl']
except (AttributeError, TypeError, ValueError):
continue
try:
lat = plz_entry['geo_point_2d']['lat']
lon = plz_entry['geo_point_2d']['lon']
except (AttributeError, TypeError):
continue
if plz is None or lat is None or lon is None:
continue
try:
name = plz_entry['ortbez27']
except (KeyError, AttributeError, TypeError):
name = None
plz, _ = Plz.objects.get_or_create(plz=int(plz), lat=lat, lon=lon, name=name)
self.stdout.write(self.style.SUCCESS('Wrote PLZ data to the databse successfully\n'))
self.stdout.write(self.style.SUCCESS('Done!'))

View file

@ -0,0 +1,24 @@
# Generated by Django 4.2.10 on 2024-02-10 11:16
from django.db import migrations, models
from pflaenzli.utils.load_plz import load_plz
class Migration(migrations.Migration):
replaces = [("pflaenzli", "0007_alter_plz_name"), ("pflaenzli", "0008_load_plz")]
dependencies = [
("pflaenzli", "0006_plz"),
]
operations = [
migrations.AlterField(
model_name="plz",
name="name",
field=models.CharField(max_length=40),
),
migrations.RunPython(
code=load_plz,
),
]

View file

@ -18,7 +18,7 @@ class Plz(models.Model):
plz = models.IntegerField(verbose_name='PLZ')
lat = models.DecimalField(max_digits=8, decimal_places=6)
lon = models.DecimalField(max_digits=8, decimal_places=6)
name = models.CharField(max_length=27)
name = models.CharField(max_length=40)
class Offer(models.Model):

View file

@ -0,0 +1,65 @@
import os
from urllib import request
import hashlib
import shutil
import pandas as pd
from pflaenzli.models import Plz
FILENAME = "AMTOVZ_CSV_WGS84"
FILEPATH = os.path.join(FILENAME, f"{FILENAME}.csv")
def add_arguments(parser):
parser.add_argument("--force", action="store_true", required=False)
def load_plz(apps, schema_editor, force = True):
get_index()
if force or not hash_matches():
create_objects()
def hash_file(filename):
h = hashlib.sha1()
with open(filename,'rb') as file:
chunk = 0
while chunk != b'':
# read only 1024 bytes at a time
chunk = file.read(1024)
h.update(chunk)
return h.hexdigest()
def get_index(url="https://data.geo.admin.ch/ch.swisstopo-vd.ortschaftenverzeichnis_plz/ortschaftenverzeichnis_plz/ortschaftenverzeichnis_plz_4326.csv.zip"):
request.urlretrieve(url, 'index.zip')
shutil.unpack_archive('index.zip')
def hash_matches(file=FILEPATH, hashfile="hash"):
if os.path.exists(hashfile):
with open(hashfile, 'r+') as f:
old_hash = f.read()
new_hash = hash_file(file)
if old_hash == new_hash:
return True
f.truncate(0)
f.write(new_hash)
else:
with open(hashfile, 'w') as f:
f.write(hash_file(file))
return False
def create_objects(file=FILEPATH):
df = pd.read_csv(file, sep=";")
objects = [Plz(plz=row['PLZ'], lat=row['N'], lon=row['E'], name=row['Ortschaftsname']) for index, row in df.iterrows()]
Plz.objects.all().delete()
Plz.objects.bulk_create(objects)