#!/usr/bin/env python3
|
|
|
|
import re
|
|
import httpx
|
|
import sys
|
|
import socket
|
|
import urllib.request
|
|
import json
|
|
import argparse
|
|
|
|
domains = []
|
|
bad_stuns = {
|
|
r"stun[\d]{0,1}\.l\.google\.com": "Google"
|
|
}
|
|
badAsnDesc = [
|
|
"GOOGLE",
|
|
"AMAZON",
|
|
"CLOUDFLARE",
|
|
"MICROSOFT"
|
|
]
|
|
|
|
analyticsOptions = [
|
|
"googleAnalyticsTrackingId",
|
|
"amplitudeAPPKey",
|
|
"scriptURLs"
|
|
]
|
|
|
|
authOptions = [
|
|
"anonymousdomain"
|
|
]
|
|
|
|
def getPage(domain):
|
|
isAvailable = True
|
|
corporateStun = False
|
|
jitsiVersion = None
|
|
authEnabled = False
|
|
analyticsEnabled = False
|
|
|
|
try:
|
|
page = httpx.get('https://' + domain + "/sdlkndflfdldhdfhdsofofbvcxbsd", timeout=30.0)
|
|
|
|
if page.status_code != 200:
|
|
return False
|
|
|
|
return page
|
|
|
|
except httpx._exceptions.NetworkError as err:
|
|
sys.stderr.write(str(err) + "\n")
|
|
return False
|
|
|
|
def getJitsiVersion(page):
|
|
if not page:
|
|
return False
|
|
|
|
jitsiVersion = None
|
|
matches = re.findall("all\.css\?v\=(.*)\"", page.text)
|
|
for match in matches:
|
|
jitsiVersion = match;
|
|
|
|
return jitsiVersion;
|
|
|
|
# Checks if options are commented out or active
|
|
def checkActiveOptions(page,items):
|
|
if not page:
|
|
return False
|
|
for item in items:
|
|
matches = re.findall("\n.*" + item, page.text)
|
|
for match in matches:
|
|
if "//" not in match:
|
|
return True
|
|
break
|
|
|
|
return False;
|
|
|
|
def checkHoster(result):
|
|
if result['status'] == "success":
|
|
for asnDesc in badAsnDesc:
|
|
if result['as'].casefold().find(asnDesc.casefold()) != -1:
|
|
return True
|
|
return False
|
|
|
|
def getDataFromAPI(domains):
|
|
results = {}
|
|
current = []
|
|
while domains != []:
|
|
if len(domains) >= 100:
|
|
current = domains[:100]
|
|
domains = domains[100:]
|
|
else:
|
|
current = domains
|
|
domains = []
|
|
ips = [None] * len(current)
|
|
for i in range(len(current)):
|
|
try:
|
|
ips[i] = socket.gethostbyname(current[i])
|
|
except socket.gaierror as err:
|
|
sys.stderr.write(str(err) + "\n")
|
|
r = httpx.post('http://ip-api.com/batch?fields=status,countryCode,isp,as,query', json=ips)
|
|
answers = r.json()
|
|
for i in range(len(current)):
|
|
results[current[i]] = answers[i]
|
|
if r.headers['X-Rl'] == 0:
|
|
time.sleep(r.headers['X-Ttl'])
|
|
current = []
|
|
ips = []
|
|
return results
|
|
|
|
def parseText(f):
|
|
char = f.read(1)
|
|
while char:
|
|
if char == "\"":
|
|
domain = ""
|
|
char = f.read(1)
|
|
while char != "\"":
|
|
domain = domain + char
|
|
char = f.read(1)
|
|
domains.append(domain)
|
|
char = f.read(1)
|
|
f.close()
|
|
|
|
def outputMarkdown(outputs, unavailable):
|
|
sys.stdout.write("# Jitsi Instanzen\n")
|
|
outputListAsMarkdown(outputs)
|
|
sys.stdout.write("\n## Aktuell nicht erreichbar\n")
|
|
outputListAsMarkdown(unavailable)
|
|
sys.stdout.write("\n# Legende\n")
|
|
sys.stdout.write("\u2705: Die Instanz nutzt nicht den Google STUN Server\n\n")
|
|
sys.stdout.write("\u274c: Die Instanz ist bei Google, Amazon, Cloudflare oder Microsoft gehostet\n\n")
|
|
sys.stdout.write("\U0001F510: Die Instanz hat Authentifizierung aktiviert\n\n")
|
|
sys.stdout.write("\U0001F50D: Die Instanz nutzt Analytics")
|
|
|
|
def outputListAsMarkdown(outputs):
|
|
for output in outputs:
|
|
if (not output['available']):
|
|
sys.stdout.write("[" + output['domain'] + "](https://" + output['domain'] + ") ")
|
|
else:
|
|
sys.stdout.write("[" + output['domain'] + "](https://" + output['domain'] + ") ")
|
|
sys.stdout.write(output['countryCode'] + " ")
|
|
sys.stdout.write("(Hoster: " + output['hoster'])
|
|
if output['jitsiVersion']:
|
|
sys.stdout.write(" , Version: " + output['jitsiVersion'])
|
|
sys.stdout.write(") ")
|
|
if not output['corporateStun']:
|
|
sys.stdout.write("\u2705 ")
|
|
if output['authEnabled']:
|
|
sys.stdout.write("\U0001F510 ")
|
|
if output['analyticsEnabled']:
|
|
sys.stdout.write("\U0001F50D ")
|
|
if output['corporateHoster']:
|
|
sys.stdout.write("\u274c")
|
|
sys.stdout.write("\\\n")
|
|
|
|
def outputJSON(outputs, unavailable):
|
|
outputList = outputs + unavailable
|
|
sys.stdout.write(json.dumps({"data": outputList}));
|
|
|
|
def parseFormat(format):
|
|
if format == "json" or format == "md":
|
|
return format
|
|
else:
|
|
msg = "%r is not valid, valid are md and json" % format
|
|
raise argparse.ArgumentTypeError(msg)
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument("list", type=argparse.FileType("r"), help="File with instances, each in one line with the following format: \"meet.example.com\",")
|
|
parser.add_argument("-f", "--format", help="Output format, can either be md or json", default="md", type=parseFormat)
|
|
args = parser.parse_args()
|
|
|
|
parseText(args.list)
|
|
format = args.format
|
|
sys.stderr.write(f"Output format: {format}\n")
|
|
|
|
#Check all domains
|
|
domains.sort()
|
|
total = len(domains)
|
|
i = 0
|
|
outputs = []
|
|
notAccessible = []
|
|
sys.stderr.write("Doing API request now\n")
|
|
apiResults = getDataFromAPI(domains)
|
|
for domain, result in apiResults.items():
|
|
sys.stderr.write("Doing page " + domain + " now\n")
|
|
|
|
page = getPage(domain)
|
|
|
|
available = True if page else False
|
|
|
|
output = {}
|
|
if available == True and result['status'] == "success":
|
|
output = {
|
|
"domain": domain,
|
|
"available": True,
|
|
"corporateHoster": checkHoster(result),
|
|
"hoster": result['isp'],
|
|
"countryCode": result['countryCode'],
|
|
"corporateStun": checkActiveOptions(page,bad_stuns),
|
|
"jitsiVersion": getJitsiVersion(page),
|
|
"authEnabled": checkActiveOptions(page,authOptions),
|
|
"analyticsEnabled": checkActiveOptions(page,analyticsOptions),
|
|
}
|
|
else:
|
|
output = {
|
|
"domain": domain,
|
|
"available": False,
|
|
"countryCode": "",
|
|
}
|
|
if result['status'] == "success":
|
|
output['countryCode'] = result['countryCode']
|
|
|
|
if available:
|
|
outputs.append(output)
|
|
else:
|
|
sys.stderr.write(domain + " is unavailable\n")
|
|
notAccessible.append(output)
|
|
|
|
outputs = sorted(outputs, key=lambda output: output['countryCode'])
|
|
notAccessible = sorted(notAccessible, key=lambda output: output['countryCode'])
|
|
if format == "json":
|
|
outputJSON(outputs, notAccessible)
|
|
elif format == "md":
|
|
outputMarkdown(outputs, notAccessible)
|
|
else:
|
|
sys.stderr.write("Unknown output format: " + format + "\n")
|
|
|
|
if __name__ == '__main__':
|
|
main()
|