yunohost/bin/yunomdns
2021-08-12 18:54:20 +02:00

173 lines
5.2 KiB
Python
Executable file

#!/usr/bin/env python3
"""
Pythonic declaration of mDNS .local domains for YunoHost
"""
import subprocess
import re
import sys
import yaml
import socket
from time import sleep
from typing import List, Dict
from zeroconf import Zeroconf, ServiceInfo
# Helper command taken from Moulinette
def check_output(args, stderr=subprocess.STDOUT, shell=True, **kwargs):
"""Run command with arguments and return its output as a byte string
Overwrite some of the arguments to capture standard error in the result
and use shell by default before calling subprocess.check_output.
"""
return (
subprocess.check_output(args, stderr=stderr, shell=shell, **kwargs)
.decode("utf-8")
.strip()
)
# Helper command taken from Moulinette
def _extract_inet(string, skip_netmask=False, skip_loopback=True):
"""
Extract IP addresses (v4 and/or v6) from a string limited to one
address by protocol
Keyword argument:
string -- String to search in
skip_netmask -- True to skip subnet mask extraction
skip_loopback -- False to include addresses reserved for the
loopback interface
Returns:
A dict of {protocol: address} with protocol one of 'ipv4' or 'ipv6'
"""
ip4_pattern = (
r"((25[0-5]|2[0-4]\d|[0-1]?\d?\d)(\.(25[0-5]|2[0-4]\d|[0-1]?\d?\d)){3}"
)
ip6_pattern = r"(((?:[0-9A-Fa-f]{1,4}(?::[0-9A-Fa-f]{1,4})*)?)::?((?:[0-9A-Fa-f]{1,4}(?::[0-9A-Fa-f]{1,4})*)?)"
ip4_pattern += r"/[0-9]{1,2})" if not skip_netmask else ")"
ip6_pattern += r"/[0-9]{1,3})" if not skip_netmask else ")"
result = {}
for m in re.finditer(ip4_pattern, string):
addr = m.group(1)
if skip_loopback and addr.startswith("127."):
continue
# Limit to only one result
result["ipv4"] = addr
break
for m in re.finditer(ip6_pattern, string):
addr = m.group(1)
if skip_loopback and addr == "::1":
continue
# Limit to only one result
result["ipv6"] = addr
break
return result
# Helper command taken from Moulinette
def get_network_interfaces():
# Get network devices and their addresses (raw infos from 'ip addr')
devices_raw = {}
output = check_output("ip --brief a").split("\n")
for line in output:
line = line.split()
iname = line[0]
ips = ' '.join(line[2:])
devices_raw[iname] = ips
# Parse relevant informations for each of them
devices = {
name: _extract_inet(addrs)
for name, addrs in devices_raw.items()
if name != "lo"
}
return devices
if __name__ == '__main__':
###
# CONFIG
###
with open('/etc/yunohost/mdns.yml', 'r') as f:
config = yaml.safe_load(f) or {}
updated = False
required_fields = ["interfaces", "domains"]
missing_fields = [field for field in required_fields if field not in config]
if missing_fields:
print("The fields %s are required" % ', '.join(missing_fields))
if config['interfaces'] is None:
print('No interface listed for broadcast.')
sys.exit(0)
if 'yunohost.local' not in config['domains']:
config['domains'].append('yunohost.local')
zcs = {}
interfaces = get_network_interfaces()
for interface in config['interfaces']:
infos = [] # List of ServiceInfo objects, to feed Zeroconf
ips = [] # Human-readable IPs
b_ips = [] # Binary-convered IPs
ipv4 = interfaces[interface]['ipv4'].split('/')[0]
if ipv4:
ips.append(ipv4)
b_ips.append(socket.inet_pton(socket.AF_INET, ipv4))
ipv6 = interfaces[interface]['ipv6'].split('/')[0]
if ipv6:
ips.append(ipv6)
b_ips.append(socket.inet_pton(socket.AF_INET6, ipv6))
# If at least one IP is listed
if ips:
# Create a Zeroconf object, and store the ServiceInfos
zc = Zeroconf(interfaces=ips)
zcs[zc]=[]
for d in config['domains']:
d_domain=d.replace('.local','')
if '.' in d_domain:
print(d_domain+'.local: subdomains are not supported.')
else:
# Create a ServiceInfo object for each .local domain
zcs[zc].append(ServiceInfo(
type_='_device-info._tcp.local.',
name=interface+': '+d_domain+'._device-info._tcp.local.',
addresses=b_ips,
port=80,
server=d+'.',
))
print('Adding '+d+' with addresses '+str(ips)+' on interface '+interface)
# Run registration
print("Registering...")
for zc, infos in zcs.items():
for info in infos:
zc.register_service(info)
try:
print("Registered. Press Ctrl+C or stop service to stop.")
while True:
sleep(1)
except KeyboardInterrupt:
pass
finally:
print("Unregistering...")
for zc, infos in zcs.items():
for info in infos:
zc.unregister_service(info)
zc.close()