Statuszeile und filtern von aktiven Adressen #9

Merged
sroth merged 11 commits from statuszeile_und_filter into main 2025-04-30 20:04:44 +02:00
Showing only changes of commit dc27197129 - Show all commits

77
src/connector.py Normal file
View File

@@ -0,0 +1,77 @@
import json
import os
from abc import ABC, abstractmethod
import config
class Connector(ABC):
def __init__(self):
pass
@abstractmethod
def get_all(self) -> list:
pass
@abstractmethod
def get_by_id(self, record_id: int) -> dict:
pass
@abstractmethod
def delete_by_id(self, record_id: int):
pass
@abstractmethod
def update_record(self, updated_record: dict):
pass
@abstractmethod
def create_new(self, values: dict) -> int:
pass
class JSONConnector(Connector):
def __init__(self):
super().__init__()
self.config = config.Config()
self.json_path = self.config.get("json", "path")
self.json_file = os.path.join(self.json_path, "brovski-adress-etiketten-verwaltung.json")
def get_all(self) -> list:
with open(self.json_file, "r") as f:
return json.load(f)
def get_by_id(self, record_id: int) -> dict:
data = self.get_all()
for record in data:
if record["record_id"] == record_id:
return record
def delete_by_id(self, record_id: int):
data = self.get_all()
for idx, record in enumerate(data):
if record["record_id"] == record_id:
del data[idx]
self._write_to_file(data)
def update_record(self, new_record: dict):
data = self.get_all()
for idx, record in enumerate(data):
if record["record_id"] == new_record["record_id"]:
data[idx] = new_record
self._write_to_file(data)
def create_new(self, record: dict) -> int:
data = self.get_all()
if len(data) == 0:
next_id = 0
else:
next_id = max(data, key=lambda x: x["record_id"])["record_id"] + 1
record["record_id"] = next_id
data.append(record)
self._write_to_file(data)
return next_id
def _write_to_file(self, data):
with open(self.json_file, "w") as f:
json.dump(data, f, indent=4)