Files
DiaShower/server.py
2026-03-07 00:01:43 +01:00

269 lines
8.2 KiB
Python

import http.server
import socketserver
import os
import json
import requests
from PIL import Image
from PIL.ExifTags import TAGS
import urllib.parse
from datetime import datetime
# --
# Configuration
# --
with open("config.json") as f:
cfg = json.load(f)
#--
# Web Router
#--
class SlideshowHandler(http.server.SimpleHTTPRequestHandler):
def do_GET(self):
if self.path == "/":
self.send_slideshow_page()
elif self.path == "/images.json":
self.send_image_list()
elif self.path == "/weather":
if cfg['WeatherProvider'] == "HomeAssistant":
self.get_HASSweather()
else:
self.get_METNOweather()
elif self.path.startswith("/exif"):
self.send_exif_data()
else:
super().do_GET()
#--
# Main Page
#--
def send_slideshow_page(self):
html_path = "show.html"
if os.path.exists(html_path):
with open(html_path, "rb") as f:
content = f.read()
self.send_response(200)
self.send_header("Content-type", "text/html")
self.send_header("Content-length", len(content))
self.end_headers()
self.wfile.write(content)
#--
# Image list
#--
def send_image_list(self):
files = os.listdir(cfg['IMAGE_DIR'])
images = [
f"/{cfg['IMAGE_DIR']}/{f}"
for f in files
if f.lower().endswith((".jpg", ".jpeg", ".png", ".gif", ".webp"))
]
self.send_response(200)
self.send_header("Content-type", "application/json")
self.end_headers()
self.wfile.write(json.dumps(images).encode("utf-8"))
print(self)
#--
# Weather - From HASS
#--
def get_HASSweather(self):
try:
url = f"{cfg['HA_URL']}/api/states/{cfg['HA_WEATHER_ENTITY']}"
headers = {
"Authorization": f"Bearer {cfg['HA_TOKEN']}",
"Content-Type": "application/json"
}
ha_response = requests.get(url, headers=headers, timeout=5)
ha_response.raise_for_status()
data = ha_response.json()
icon_map = {
"sunny": "clearsky_day",
"clear-night": "clearsky_night",
"cloudy": "cloudy",
"partlycloudy": "fair_day",
"rainy": "rain",
"pouring": "heavyrain",
"lightning": "lightrainandthunder",
"lightning-rainy": "rainandthunder",
"snowy": "snow",
"snowy-rainy": "sleet",
"fog": "fog"
}
symbol = icon_map.get(data.get("state"), "cloudy")
result = {
#"state": data.get("state"),
"temperature": data["attributes"].get("temperature"),
"symbol": 'https://raw.githubusercontent.com/metno/weathericons/refs/heads/main/weather/svg/'+ symbol + '.svg'
#"humidity": data["attributes"].get("humidity"),
#"pressure": data["attributes"].get("pressure"),
#"wind_speed": data["attributes"].get("wind_speed"),
#"forecast": data["attributes"].get("forecast"),
}
self._send_json(result)
except Exception as e:
self._send_json({"error": str(e)}, status=500)
#--
# Weather - From Net.no
#--
def get_METNOweather(self):
try:
url = f"https://api.met.no/weatherapi/locationforecast/2.0/compact?lat={cfg['METNO_LAT']}&lon={cfg['METNO_LON']}"
headers = {
"User-Agent": "AmbientDisplay/1.0"
}
metno_response = requests.get(url, headers=headers, timeout=5)
metno_response.raise_for_status()
data = metno_response.json()
result = {
"temperature": data.get("properties", {}).get("timeseries", [{}])[0].get("data", {}).get("instant", {}).get("details", {}).get("air_temperature"),
"symbol": 'https://raw.githubusercontent.com/metno/weathericons/refs/heads/main/weather/svg/'+ data.get("properties", {}).get("timeseries", [{}])[0].get("data", {}).get("next_1_hours", {}).get("summary", {}).get("symbol_code") + '.svg'
}
self._send_json(result)
except Exception as e:
self._send_json({"error": str(e)}, status=500)
#--
# Weather - Json answer
#--
def _send_json(self, obj, status=200):
payload = json.dumps(obj).encode("utf-8")
self.send_response(status)
self.send_header("Content-Type", "application/json")
self.send_header("Content-Length", str(len(payload)))
self.end_headers()
self.wfile.write(payload)
#--
# Image info
#--
def send_exif_data(self):
parsed = urllib.parse.urlparse(self.path)
params = urllib.parse.parse_qs(parsed.query)
filename = params.get("file", [None])[0]
if not filename:
self.send_error(400, "Missing ?file= parameter")
return
filepath = filename.lstrip("/")
if not os.path.exists(filepath):
self.send_error(404, "File not found")
return
capture_date = self.get_capture_date(filepath)
GeoData = self.get_GeoData(filepath)
response = {
"capture_date": capture_date,
"geo_data": GeoData
}
self.send_response(200)
self.send_header("Content-type", "application/json")
self.end_headers()
self.wfile.write(json.dumps(response).encode("utf-8"))
#--
# Image info -- Date
#--
def get_capture_date(self, path):
try:
img = Image.open(path)
exif = img._getexif()
if not exif:
return None
for tag_id, value in exif.items():
tag = TAGS.get(tag_id, tag_id)
if tag in ("DateTimeOriginal", "DateTime"):
try:
dt = datetime.strptime(value, "%Y:%m:%d %H:%M:%S")
return dt.isoformat()
except Exception:
return None
return None
except Exception:
return None
#--
# Image info -- GeoData/GPS
#--
def get_GeoData(self, path):
def convert_to_degrees(value):
def to_float(x):
return x[0] / x[1] if isinstance(x, tuple) else float(x)
d = to_float(value[0])
m = to_float(value[1])
s = to_float(value[2])
return d + (m / 60.0) + (s / 3600.0)
def get_gecoded(lon, lat):
url = f"{cfg['PhotonAPI']}?lang=en&lat={lat}&lon={lon}"
headers = {
"X-Api-Key": cfg['PhotonAPI_TOKEN']
}
geoapi_response = requests.get(url, headers=headers, timeout=30)
geoapi_response.raise_for_status()
return geoapi_response.json()
try:
img = Image.open(path)
exif = img._getexif()
if not exif:
return None
for tag_id, value in exif.items():
tag = TAGS.get(tag_id, tag_id)
if tag in ("GPSInfo"):
try:
lat = convert_to_degrees(value[2])
lat_ref = value[1]
if lat_ref == "S":
lat = -lat
lon = convert_to_degrees(value[4])
lon_ref = value[3]
if lon_ref == "W":
lon = -lon
GeoDATA = get_gecoded(lon, lat)
return {
"GeoData": GeoDATA,
"latitude": lat,
"longitude": lon
}
except Exception:
return None
return None
except Exception:
return None
#--
# WebServer
#--
if __name__ == "__main__":
os.chdir(os.path.dirname(os.path.abspath(__file__)))
print(f"Serving slideshow on http://0.0.0.0:{cfg['PORT']}")
with socketserver.TCPServer(("", cfg['PORT']), SlideshowHandler) as httpd:
httpd.serve_forever()