You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 

227 lines
6.8 KiB

#!/usr/bin/env python3
import re
import httpx
import sys
import socket
import urllib.request
import json
import argparse
domains = []
bad_stuns = {
r"stun[\d]{0,1}\.l\.google\.com": "Google"
}
badAsnDesc = [
"GOOGLE",
"AMAZON",
"CLOUDFLARE",
"MICROSOFT"
]
analyticsOptions = [
"googleAnalyticsTrackingId",
"amplitudeAPPKey",
"scriptURLs"
]
authOptions = [
"anonymousdomain"
]
def getPage(domain):
isAvailable = True
corporateStun = False
jitsiVersion = None
authEnabled = False
analyticsEnabled = False
try:
page = httpx.get('https://' + domain + "/sdlkndflfdldhdfhdsofofbvcxbsd", timeout=30.0)
if page.status_code != 200:
return False
return page
except httpx._exceptions.NetworkError as err:
sys.stderr.write(str(err) + "\n")
return False
def getJitsiVersion(page):
if not page:
return False
jitsiVersion = None
matches = re.findall("all\.css\?v\=(.*)\"", page.text)
for match in matches:
jitsiVersion = match;
return jitsiVersion;
# Checks if options are commented out or active
def checkActiveOptions(page,items):
if not page:
return False
for item in items:
matches = re.findall("\n.*" + item, page.text)
for match in matches:
if "//" not in match:
return True
break
return False;
def checkHoster(result):
if result['status'] == "success":
for asnDesc in badAsnDesc:
if result['as'].casefold().find(asnDesc.casefold()) != -1:
return True
return False
def getDataFromAPI(domains):
results = {}
current = []
while domains != []:
if len(domains) >= 100:
current = domains[:100]
domains = domains[100:]
else:
current = domains
domains = []
ips = [None] * len(current)
for i in range(len(current)):
try:
ips[i] = socket.gethostbyname(current[i])
except socket.gaierror as err:
sys.stderr.write(str(err) + "\n")
r = httpx.post('http://ip-api.com/batch?fields=status,countryCode,isp,as,query', json=ips)
answers = r.json()
for i in range(len(current)):
results[current[i]] = answers[i]
if r.headers['X-Rl'] == 0:
time.sleep(r.headers['X-Ttl'])
current = []
ips = []
return results
def parseText(f):
char = f.read(1)
while char:
if char == "\"":
domain = ""
char = f.read(1)
while char != "\"":
domain = domain + char
char = f.read(1)
domains.append(domain)
char = f.read(1)
f.close()
def outputMarkdown(outputs, unavailable):
sys.stdout.write("# Jitsi Instanzen\n")
outputListAsMarkdown(outputs)
sys.stdout.write("\n## Aktuell nicht erreichbar\n")
outputListAsMarkdown(unavailable)
sys.stdout.write("\n# Legende\n")
sys.stdout.write("\u2705: Die Instanz nutzt nicht den Google STUN Server\n\n")
sys.stdout.write("\u274c: Die Instanz ist bei Google, Amazon, Cloudflare oder Microsoft gehostet\n\n")
sys.stdout.write("\U0001F510: Die Instanz hat Authentifizierung aktiviert\n\n")
sys.stdout.write("\U0001F50D: Die Instanz nutzt Analytics")
def outputListAsMarkdown(outputs):
for output in outputs:
if (not output['available']):
sys.stdout.write("[" + output['domain'] + "](https://" + output['domain'] + ") ")
else:
sys.stdout.write("[" + output['domain'] + "](https://" + output['domain'] + ") ")
sys.stdout.write(output['countryCode'] + " ")
sys.stdout.write("(Hoster: " + output['hoster'])
if output['jitsiVersion']:
sys.stdout.write(" , Version: " + output['jitsiVersion'])
sys.stdout.write(") ")
if not output['corporateStun']:
sys.stdout.write("\u2705 ")
if output['authEnabled']:
sys.stdout.write("\U0001F510 ")
if output['analyticsEnabled']:
sys.stdout.write("\U0001F50D ")
if output['corporateHoster']:
sys.stdout.write("\u274c")
sys.stdout.write("\\\n")
def outputJSON(outputs, unavailable):
outputList = outputs + unavailable
sys.stdout.write(json.dumps({"data": outputList}));
def parseFormat(format):
if format == "json" or format == "md":
return format
else:
msg = "%r is not valid, valid are md and json" % format
raise argparse.ArgumentTypeError(msg)
def main():
parser = argparse.ArgumentParser()
parser.add_argument("list", type=argparse.FileType("r"), help="File with instances, each in one line with the following format: \"meet.example.com\",")
parser.add_argument("-f", "--format", help="Output format, can either be md or json", default="md", type=parseFormat)
args = parser.parse_args()
parseText(args.list)
format = args.format
sys.stderr.write(f"Output format: {format}\n")
#Check all domains
domains.sort()
total = len(domains)
i = 0
outputs = []
notAccessible = []
sys.stderr.write("Doing API request now\n")
apiResults = getDataFromAPI(domains)
for domain, result in apiResults.items():
sys.stderr.write("Doing page " + domain + " now\n")
page = getPage(domain)
available = True if page else False
output = {}
if available == True and result['status'] == "success":
output = {
"domain": domain,
"available": True,
"corporateHoster": checkHoster(result),
"hoster": result['isp'],
"countryCode": result['countryCode'],
"corporateStun": checkActiveOptions(page,bad_stuns),
"jitsiVersion": getJitsiVersion(page),
"authEnabled": checkActiveOptions(page,authOptions),
"analyticsEnabled": checkActiveOptions(page,analyticsOptions),
}
else:
output = {
"domain": domain,
"available": False,
"countryCode": "",
}
if result['status'] == "success":
output['countryCode'] = result['countryCode']
if available:
outputs.append(output)
else:
sys.stderr.write(domain + " is unavailable\n")
notAccessible.append(output)
outputs = sorted(outputs, key=lambda output: output['countryCode'])
notAccessible = sorted(notAccessible, key=lambda output: output['countryCode'])
if format == "json":
outputJSON(outputs, notAccessible)
elif format == "md":
outputMarkdown(outputs, notAccessible)
else:
sys.stderr.write("Unknown output format: " + format + "\n")
if __name__ == '__main__':
main()