Files
Brovski-Adress-Etiketten-Ve…/src/connector.py
2025-09-27 12:46:57 +02:00

89 lines
2.5 KiB
Python

import json
import os
from abc import ABC, abstractmethod
from config import Config
class Connector(ABC):
def __init__(self, config: Config):
self.config = config
@abstractmethod
def get_all(self) -> list:
pass
@abstractmethod
def get_by_id(self, record_id: int) -> dict:
pass
@abstractmethod
def delete_by_id(self, record_id: int):
pass
@abstractmethod
def update_record(self, updated_record: dict):
pass
@abstractmethod
def create_new(self, values: dict) -> int:
pass
@abstractmethod
def get_all_sorted_by(self, field: str, reverse: bool = False) -> list:
pass
class JSONConnector(Connector):
def __init__(self, config: Config):
super().__init__(config)
self.json_path = self.config.get("json", "path")
self.json_file = os.path.join(self.json_path, "brovski-adress-etiketten-verwaltung-v7.json")
def get_all(self) -> list:
try:
with open(self.json_file, "r") as f:
return json.load(f)
except FileNotFoundError:
return []
def get_all_sorted_by(self, field: str, reverse=False) -> list:
with open(self.json_file, "r") as f:
data = json.load(f)
return sorted(data, key=lambda k: k[field], reverse=reverse)
def get_by_id(self, record_id: int) -> dict:
data = self.get_all()
for record in data:
if record["record_id"] == record_id:
return record
def delete_by_id(self, record_id: int):
data = self.get_all()
for idx, record in enumerate(data):
if record["record_id"] == record_id:
del data[idx]
self._write_to_file(data)
def update_record(self, new_record: dict):
data = self.get_all()
for idx, record in enumerate(data):
if record["record_id"] == new_record["record_id"]:
data[idx] = new_record
self._write_to_file(data)
def create_new(self, record: dict) -> int:
data = self.get_all()
if len(data) == 0:
next_id = 0
else:
next_id = max(data, key=lambda x: x["record_id"])["record_id"] + 1
record["record_id"] = next_id
data.append(record)
self._write_to_file(data)
return next_id
def _write_to_file(self, data):
with open(self.json_file, "w", encoding="UTF-8") as f:
json.dump(data, f, indent=4, ensure_ascii=False)