yunohost/bin/yunomdns
2021-08-11 16:19:51 +00:00

253 lines
8.3 KiB
Python
Executable file

#!/usr/bin/env python3
"""
WIP
Pythonic declaration of mDNS .local domains for YunoHost
Based off https://github.com/jstasiak/python-zeroconf/blob/master/tests/test_asyncio.py
"""
import subprocess
import os
import re
import sys
import argparse
import yaml
import asyncio
import logging
import socket
from time import sleep
from typing import List, Dict
from zeroconf import Zeroconf, ServiceInfo
# Helper command taken from Moulinette
def check_output(args, stderr=subprocess.STDOUT, shell=True, **kwargs):
"""Run command with arguments and return its output as a byte string
Overwrite some of the arguments to capture standard error in the result
and use shell by default before calling subprocess.check_output.
"""
return (
subprocess.check_output(args, stderr=stderr, shell=shell, **kwargs)
.decode("utf-8")
.strip()
)
# Helper command taken from Moulinette
def _extract_inet(string, skip_netmask=False, skip_loopback=True):
"""
Extract IP addresses (v4 and/or v6) from a string limited to one
address by protocol
Keyword argument:
string -- String to search in
skip_netmask -- True to skip subnet mask extraction
skip_loopback -- False to include addresses reserved for the
loopback interface
Returns:
A dict of {protocol: address} with protocol one of 'ipv4' or 'ipv6'
"""
ip4_pattern = (
r"((25[0-5]|2[0-4]\d|[0-1]?\d?\d)(\.(25[0-5]|2[0-4]\d|[0-1]?\d?\d)){3}"
)
ip6_pattern = r"(((?:[0-9A-Fa-f]{1,4}(?::[0-9A-Fa-f]{1,4})*)?)::((?:[0-9A-Fa-f]{1,4}(?::[0-9A-Fa-f]{1,4})*)?)"
ip4_pattern += r"/[0-9]{1,2})" if not skip_netmask else ")"
ip6_pattern += r"/[0-9]{1,3})" if not skip_netmask else ")"
result = {}
for m in re.finditer(ip4_pattern, string):
addr = m.group(1)
if skip_loopback and addr.startswith("127."):
continue
# Limit to only one result
result["ipv4"] = addr
break
for m in re.finditer(ip6_pattern, string):
addr = m.group(1)
if skip_loopback and addr == "::1":
continue
# Limit to only one result
result["ipv6"] = addr
break
return result
# Helper command taken from Moulinette
def get_network_interfaces():
# Get network devices and their addresses (raw infos from 'ip addr')
devices_raw = {}
output = check_output("ip addr show")
for d in re.split(r"^(?:[0-9]+: )", output, flags=re.MULTILINE):
# Extract device name (1) and its addresses (2)
m = re.match(r"([^\s@]+)(?:@[\S]+)?: (.*)", d, flags=re.DOTALL)
if m:
devices_raw[m.group(1)] = m.group(2)
# Parse relevant informations for each of them
devices = {
name: _extract_inet(addrs)
for name, addrs in devices_raw.items()
if name != "lo"
}
return devices
if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG)
###
# ARGUMENTS
###
parser = argparse.ArgumentParser(description='''
mDNS broadcast for .local domains.
Configuration file: /etc/yunohost/mdns.yml
Subdomains are not supported.
''')
parser.add_argument('--debug', action='store_true')
parser.add_argument('--regen', nargs='?', const='as_stored', choices=['domains', 'interfaces', 'all', 'as_stored'],
help='''
Regenerates selection into the configuration file then starts mDNS broadcasting.
''')
parser.add_argument('--set-regen', choices=['domains', 'interfaces', 'all', 'none'],
help='''
Set which part of the configuration to be regenerated.
Implies --regen as_stored, with newly stored parameter.
''')
able = parser.add_mutually_exclusive_group()
able.add_argument('--enable', action='store_true', help='Enables mDNS broadcast, and regenerates the configuration')
able.add_argument('--disable', action='store_true')
args = parser.parse_args()
if args.debug:
logging.getLogger('zeroconf').setLevel(logging.DEBUG)
logging.getLogger('asyncio').setLevel(logging.DEBUG)
else:
logging.getLogger('zeroconf').setLevel(logging.WARNING)
logging.getLogger('asyncio').setLevel(logging.WARNING)
###
# CONFIG
###
with open('/etc/yunohost/mdns.yml', 'r') as f:
config = yaml.load(f) or {}
updated = False
if args.enable:
config['enabled'] = True
args.regen = 'as_stored'
updated = True
if args.disable:
config['enabled'] = False
updated = True
if args.set_regen:
config['regen'] = args.set_regen
args.regen = 'as_stored'
updated = True
if args.regen:
if args.regen == 'as_stored':
r = config['regen']
else:
r = args.regen
if r == 'none':
print('Regeneration disabled.')
if r == 'interfaces' or r == 'all':
config['interfaces'] = [ i for i in get_network_interfaces() ]
print('Regenerated interfaces list: ' + str(config['interfaces']))
if r == 'domains' or r == 'all':
import glob
config['domains'] = [ d.rsplit('/',1)[1][:-2] for d in glob.glob('/etc/nginx/conf.d/*.local.d') ]
if 'yunohost.local' not in config['domains']:
config['domains'].append('yunohost.local')
print('Regenerated domains list: ' + str(config['domains']))
updated = True
if updated:
with open('/etc/yunohost/mdns.yml', 'w') as f:
yaml.safe_dump(config, f, default_flow_style=False)
print('Configuration file updated.')
###
# MAIN SCRIPT
###
if config['enabled'] is not True:
print('YunomDNS is disabled.')
sys.exit(0)
if config['interfaces'] is None:
print('No interface listed for broadcast.')
sys.exit(0)
zcs = {}
interfaces = get_network_interfaces()
for interface in config['interfaces']:
infos = [] # List of ServiceInfo objects, to feed Zeroconf
ips = [] # Human-readable IPs
b_ips = [] # Binary-convered IPs
# Parse the IPs and prepare their binary version
addressed = False
try:
ip = interfaces[interface]['ipv4'].split('/')[0]
if len(ip)>0: addressed = True
ips.append(ip)
b_ips.append(socket.inet_pton(socket.AF_INET, ip))
except:
pass
try:
ip = interfaces[interface]['ipv6'].split('/')[0]
if len(ip)>0: addressed = True
ips.append(ip)
b_ips.append(socket.inet_pton(socket.AF_INET6, ip))
except:
pass
# If at least one IP is listed
if addressed:
# Create a Zeroconf object, and store the ServiceInfos
zc = Zeroconf(interfaces=ips)
zcs[zc]=[]
for d in config['domains']:
d_domain=d.replace('.local','')
if '.' in d_domain:
print(d_domain+'.local: subdomains are not supported.')
else:
# Create a ServiceInfo object for each .local domain
zcs[zc].append(ServiceInfo(
type_='_device-info._tcp.local.',
name=interface+': '+d_domain+'._device-info._tcp.local.',
addresses=b_ips,
port=80,
server=d+'.',
))
print('Adding '+d+' with addresses '+str(ips)+' on interface '+interface)
# Run registration
print("Registering...")
for zc, infos in zcs.items():
for info in infos:
zc.register_service(info)
try:
print("Registered. Press Ctrl+C or stop service to stop.")
while True:
sleep(1)
except KeyboardInterrupt:
pass
finally:
print("Unregistering...")
for zc, infos in zcs.items():
for info in infos:
zc.unregister_service(info)
zc.close()