yunohost/bin/yunomdns
2021-09-26 11:44:39 +02:00

147 lines
4.5 KiB
Python
Executable file

#!/usr/bin/env python3
"""
Pythonic declaration of mDNS .local domains for YunoHost
"""
import sys
import yaml
from time import sleep
from typing import List, Dict
import ifaddr
from zeroconf import Zeroconf, ServiceInfo, ServiceBrowser
def get_network_local_interfaces() -> Dict[str, Dict[str, List[str]]]:
"""
Returns interfaces with their associated local IPs
"""
def islocal(ip: str) -> bool:
local_prefixes = ["192.168.", "10.", "172.16.", "fc00:"]
return any(ip.startswith(prefix) for prefix in local_prefixes)
interfaces = {
adapter.name: {
"ipv4": [ip.ip for ip in adapter.ips if ip.is_IPv4 and islocal(ip.ip)],
"ipv6": [ip.ip[0] for ip in adapter.ips if ip.is_IPv6 and islocal(ip.ip[0])],
}
for adapter in ifaddr.get_adapters()
if adapter.name != "lo"
}
return interfaces
# Listener class, to detect duplicates on the network
# Stores the list of servers in its list property
class Listener:
def __init__(self):
self.list = []
def remove_service(self, zeroconf, type, name):
info = zeroconf.get_service_info(type, name)
self.list.remove(info.server)
def update_service(self, zeroconf, type, name):
pass
def add_service(self, zeroconf, type, name):
info = zeroconf.get_service_info(type, name)
self.list.append(info.server[:-1])
def main() -> bool:
###
# CONFIG
###
with open("/etc/yunohost/mdns.yml", "r") as f:
config = yaml.safe_load(f) or {}
required_fields = ["interfaces", "domains"]
missing_fields = [field for field in required_fields if field not in config]
if missing_fields:
print("The fields %s are required" % ", ".join(missing_fields))
if config["interfaces"] is None:
print("No interface listed for broadcast.")
return True
# Let's discover currently published .local domains accross the network
zc = Zeroconf()
listener = Listener()
browser = ServiceBrowser(zc, "_device-info._tcp.local.", listener)
sleep(2)
browser.cancel()
zc.close()
# If yunohost.local already exists, try yunohost-2.local, and so on.
def yunohost_local(i):
return "yunohost.local" if i < 2 else "yunohost-"+str(i)+".local"
i=1
while yunohost_local(i) in listener.list:
print("Uh oh, "+yunohost_local(i)+" already exists on the network...")
if yunohost_local(i) in config['domains']:
config['domains'].remove(yunohost_local(i))
i += 1
if yunohost_local(i) not in config['domains']:
print("Adding "+yunohost_local(i)+" to the domains to publish.")
config['domains'].append(yunohost_local(i))
zcs: Dict[Zeroconf, List[ServiceInfo]] = {}
interfaces = get_network_local_interfaces()
for interface in config["interfaces"]:
if interface not in interfaces:
print(f"Interface {interface} of config file is not present on system.")
continue
ips: List[str] = interfaces[interface]["ipv4"] + interfaces[interface]["ipv6"]
# If at least one IP is listed
if not ips:
continue
# Create a Zeroconf object, and store the ServiceInfos
zc = Zeroconf(interfaces=ips) # type: ignore
zcs[zc] = []
for d in config["domains"]:
d_domain = d.replace(".local", "")
if "." in d_domain:
print(f"{d_domain}.local: subdomains are not supported.")
continue
# Create a ServiceInfo object for each .local domain
zcs[zc].append(
ServiceInfo(
type_="_device-info._tcp.local.",
name=f"{interface}: {d_domain}._device-info._tcp.local.",
parsed_addresses=ips,
port=80,
server=f"{d}.",
)
)
print(f"Adding {d} with addresses {ips} on interface {interface}")
# Run registration
print("Registering...")
for zc, infos in zcs.items():
for info in infos:
zc.register_service(info, allow_name_change=True, cooperating_responders=True)
try:
print("Registered. Press Ctrl+C or stop service to stop.")
while True:
sleep(1)
except KeyboardInterrupt:
pass
finally:
print("Unregistering...")
for zc, infos in zcs.items():
zc.unregister_all_services()
zc.close()
return True
if __name__ == "__main__":
sys.exit(0 if main() else 1)