mirror of
https://github.com/YunoHost/yunohost.git
synced 2024-09-03 20:06:10 +02:00
294 lines
10 KiB
Python
294 lines
10 KiB
Python
#!/usr/bin/env python
|
|
|
|
import os
|
|
import dns.resolver
|
|
import re
|
|
|
|
from subprocess import CalledProcessError
|
|
|
|
from moulinette.utils.process import check_output
|
|
from moulinette.utils.filesystem import read_yaml
|
|
|
|
from yunohost.diagnosis import Diagnoser
|
|
from yunohost.domain import _get_maindomain, domain_list
|
|
from yunohost.settings import settings_get
|
|
from yunohost.utils.dns import dig
|
|
|
|
DEFAULT_DNS_BLACKLIST = "/usr/share/yunohost/other/dnsbl_list.yml"
|
|
|
|
|
|
class MailDiagnoser(Diagnoser):
|
|
|
|
id_ = os.path.splitext(os.path.basename(__file__))[0].split("-")[1]
|
|
cache_duration = 600
|
|
dependencies = ["ip"]
|
|
|
|
def run(self):
|
|
|
|
self.ehlo_domain = _get_maindomain()
|
|
self.mail_domains = domain_list()["domains"]
|
|
self.ipversions, self.ips = self.get_ips_checked()
|
|
|
|
# TODO Is a A/AAAA and MX Record ?
|
|
# TODO Are outgoing public IPs authorized to send mail by SPF ?
|
|
# TODO Validate DKIM and dmarc ?
|
|
# TODO check that the recent mail logs are not filled with thousand of email sending (unusual number of mail sent)
|
|
# TODO check for unusual failed sending attempt being refused in the logs ?
|
|
checks = [
|
|
"check_outgoing_port_25",
|
|
"check_ehlo",
|
|
"check_fcrdns",
|
|
"check_blacklist",
|
|
"check_queue",
|
|
]
|
|
for check in checks:
|
|
self.logger_debug("Running " + check)
|
|
reports = list(getattr(self, check)())
|
|
for report in reports:
|
|
yield report
|
|
if not reports:
|
|
name = check[6:]
|
|
yield dict(
|
|
meta={"test": "mail_" + name},
|
|
status="SUCCESS",
|
|
summary="diagnosis_mail_" + name + "_ok",
|
|
)
|
|
|
|
def check_outgoing_port_25(self):
|
|
"""
|
|
Check outgoing port 25 is open and not blocked by router
|
|
This check is ran on IPs we could used to send mail.
|
|
"""
|
|
|
|
for ipversion in self.ipversions:
|
|
cmd = "/bin/nc -{ipversion} -z -w2 yunohost.org 25".format(
|
|
ipversion=ipversion
|
|
)
|
|
if os.system(cmd) != 0:
|
|
yield dict(
|
|
meta={"test": "outgoing_port_25", "ipversion": ipversion},
|
|
data={},
|
|
status="ERROR",
|
|
summary="diagnosis_mail_outgoing_port_25_blocked",
|
|
details=[
|
|
"diagnosis_mail_outgoing_port_25_blocked_details",
|
|
"diagnosis_mail_outgoing_port_25_blocked_relay_vpn",
|
|
],
|
|
)
|
|
|
|
def check_ehlo(self):
|
|
"""
|
|
Check the server is reachable from outside and it's the good one
|
|
This check is ran on IPs we could used to send mail.
|
|
"""
|
|
|
|
for ipversion in self.ipversions:
|
|
try:
|
|
r = Diagnoser.remote_diagnosis(
|
|
"check-smtp", data={}, ipversion=ipversion
|
|
)
|
|
except Exception as e:
|
|
yield dict(
|
|
meta={
|
|
"test": "mail_ehlo",
|
|
"reason": "remote_server_failed",
|
|
"ipversion": ipversion,
|
|
},
|
|
data={"error": str(e)},
|
|
status="WARNING",
|
|
summary="diagnosis_mail_ehlo_could_not_diagnose",
|
|
details=["diagnosis_mail_ehlo_could_not_diagnose_details"],
|
|
)
|
|
continue
|
|
|
|
if r["status"] != "ok":
|
|
summary = r["status"].replace("error_smtp_", "diagnosis_mail_ehlo_")
|
|
yield dict(
|
|
meta={"test": "mail_ehlo", "ipversion": ipversion},
|
|
data={},
|
|
status="ERROR",
|
|
summary=summary,
|
|
details=[summary + "_details"],
|
|
)
|
|
elif r["helo"] != self.ehlo_domain:
|
|
yield dict(
|
|
meta={"test": "mail_ehlo", "ipversion": ipversion},
|
|
data={"wrong_ehlo": r["helo"], "right_ehlo": self.ehlo_domain},
|
|
status="ERROR",
|
|
summary="diagnosis_mail_ehlo_wrong",
|
|
details=["diagnosis_mail_ehlo_wrong_details"],
|
|
)
|
|
|
|
def check_fcrdns(self):
|
|
"""
|
|
Check the reverse DNS is well defined by doing a Forward-confirmed
|
|
reverse DNS check
|
|
This check is ran on IPs we could used to send mail.
|
|
"""
|
|
|
|
for ip in self.ips:
|
|
if ":" in ip:
|
|
ipversion = 6
|
|
details = [
|
|
"diagnosis_mail_fcrdns_nok_details",
|
|
"diagnosis_mail_fcrdns_nok_alternatives_6",
|
|
]
|
|
else:
|
|
ipversion = 4
|
|
details = [
|
|
"diagnosis_mail_fcrdns_nok_details",
|
|
"diagnosis_mail_fcrdns_nok_alternatives_4",
|
|
]
|
|
|
|
rev = dns.reversename.from_address(ip)
|
|
subdomain = str(rev.split(3)[0])
|
|
query = subdomain
|
|
if ipversion == 4:
|
|
query += ".in-addr.arpa"
|
|
else:
|
|
query += ".ip6.arpa"
|
|
|
|
# Do the DNS Query
|
|
status, value = dig(query, "PTR", resolvers="force_external")
|
|
if status == "nok":
|
|
yield dict(
|
|
meta={"test": "mail_fcrdns", "ipversion": ipversion},
|
|
data={"ip": ip, "ehlo_domain": self.ehlo_domain},
|
|
status="ERROR",
|
|
summary="diagnosis_mail_fcrdns_dns_missing",
|
|
details=details,
|
|
)
|
|
continue
|
|
|
|
rdns_domain = ""
|
|
if len(value) > 0:
|
|
rdns_domain = value[0][:-1] if value[0].endswith(".") else value[0]
|
|
if rdns_domain != self.ehlo_domain:
|
|
details = [
|
|
"diagnosis_mail_fcrdns_different_from_ehlo_domain_details"
|
|
] + details
|
|
yield dict(
|
|
meta={"test": "mail_fcrdns", "ipversion": ipversion},
|
|
data={
|
|
"ip": ip,
|
|
"ehlo_domain": self.ehlo_domain,
|
|
"rdns_domain": rdns_domain,
|
|
},
|
|
status="ERROR",
|
|
summary="diagnosis_mail_fcrdns_different_from_ehlo_domain",
|
|
details=details,
|
|
)
|
|
|
|
def check_blacklist(self):
|
|
"""
|
|
Check with dig onto blacklist DNS server
|
|
This check is ran on IPs and domains we could used to send mail.
|
|
"""
|
|
|
|
dns_blacklists = read_yaml(DEFAULT_DNS_BLACKLIST)
|
|
for item in self.ips + self.mail_domains:
|
|
for blacklist in dns_blacklists:
|
|
item_type = "domain"
|
|
if ":" in item:
|
|
item_type = "ipv6"
|
|
elif re.match(r"^\d+\.\d+\.\d+\.\d+$", item):
|
|
item_type = "ipv4"
|
|
|
|
if not blacklist[item_type]:
|
|
continue
|
|
|
|
# Build the query for DNSBL
|
|
subdomain = item
|
|
if item_type != "domain":
|
|
rev = dns.reversename.from_address(item)
|
|
subdomain = str(rev.split(3)[0])
|
|
query = subdomain + "." + blacklist["dns_server"]
|
|
|
|
# Do the DNS Query
|
|
status, _ = dig(query, "A")
|
|
if status != "ok":
|
|
continue
|
|
|
|
# Try to get the reason
|
|
details = []
|
|
status, answers = dig(query, "TXT")
|
|
reason = "-"
|
|
if status == "ok":
|
|
reason = ", ".join(answers)
|
|
details.append("diagnosis_mail_blacklist_reason")
|
|
|
|
details.append("diagnosis_mail_blacklist_website")
|
|
|
|
yield dict(
|
|
meta={
|
|
"test": "mail_blacklist",
|
|
"item": item,
|
|
"blacklist": blacklist["dns_server"],
|
|
},
|
|
data={
|
|
"blacklist_name": blacklist["name"],
|
|
"blacklist_website": blacklist["website"],
|
|
"reason": reason,
|
|
},
|
|
status="ERROR",
|
|
summary="diagnosis_mail_blacklist_listed_by",
|
|
details=details,
|
|
)
|
|
|
|
def check_queue(self):
|
|
"""
|
|
Check mail queue is not filled with hundreds of email pending
|
|
"""
|
|
|
|
command = (
|
|
'postqueue -p | grep -v "Mail queue is empty" | grep -c "^[A-Z0-9]" || true'
|
|
)
|
|
try:
|
|
output = check_output(command)
|
|
pending_emails = int(output)
|
|
except (ValueError, CalledProcessError) as e:
|
|
yield dict(
|
|
meta={"test": "mail_queue"},
|
|
data={"error": str(e)},
|
|
status="ERROR",
|
|
summary="diagnosis_mail_queue_unavailable",
|
|
details="diagnosis_mail_queue_unavailable_details",
|
|
)
|
|
else:
|
|
if pending_emails > 100:
|
|
yield dict(
|
|
meta={"test": "mail_queue"},
|
|
data={"nb_pending": pending_emails},
|
|
status="WARNING",
|
|
summary="diagnosis_mail_queue_too_big",
|
|
)
|
|
else:
|
|
yield dict(
|
|
meta={"test": "mail_queue"},
|
|
data={"nb_pending": pending_emails},
|
|
status="SUCCESS",
|
|
summary="diagnosis_mail_queue_ok",
|
|
)
|
|
|
|
def get_ips_checked(self):
|
|
outgoing_ipversions = []
|
|
outgoing_ips = []
|
|
ipv4 = Diagnoser.get_cached_report("ip", {"test": "ipv4"}) or {}
|
|
if ipv4.get("status") == "SUCCESS":
|
|
outgoing_ipversions.append(4)
|
|
global_ipv4 = ipv4.get("data", {}).get("global", {})
|
|
if global_ipv4:
|
|
outgoing_ips.append(global_ipv4)
|
|
|
|
if settings_get("smtp.allow_ipv6"):
|
|
ipv6 = Diagnoser.get_cached_report("ip", {"test": "ipv6"}) or {}
|
|
if ipv6.get("status") == "SUCCESS":
|
|
outgoing_ipversions.append(6)
|
|
global_ipv6 = ipv6.get("data", {}).get("global", {})
|
|
if global_ipv6:
|
|
outgoing_ips.append(global_ipv6)
|
|
return (outgoing_ipversions, outgoing_ips)
|
|
|
|
|
|
def main(args, env, loggers):
|
|
return MailDiagnoser(args, env, loggers).diagnose()
|