lineage-updater/lineage-updater

230 lines
6.2 KiB
Python
Executable File

#!/usr/bin/python3
import os
import sys
import pwd
import getopt
import re
import cgi
import hashlib
import json
import time
import fcntl
def die(msg):
sys.stdout.write("X-Error: %s\n" % (msg))
sys.exit(0)
def dbg(msg):
sys.stdout.write("X-Debug: %s\n" % (msg))
def cgi_method():
return os.environ.get('REQUEST_METHOD')
def write_response(obj):
headers = ""
headers += "Content-Type: application/json\n"
content = json.dumps(obj, indent=2)
sys.stdout.write("%s\n%s\n" % (headers, content))
def config_load(pathname):
required_keys = ['baseurl', 'directory']
if not os.path.exists(pathname):
raise RuntimeError("Configuration file %s does not exist" % (pathname))
config['debug'] = False
config['verbose'] = False
with open(pathname, 'r') as f:
for line in f:
line = line.rstrip('\n')
if len(line) == 0 or line.startswith('#'):
continue
(k, v) = line.split('=', 1)
config[k] = v
for k in required_keys:
if not k in config:
raise RuntimeError("Configuration missing required key \"%s\"" % (k))
def args_parse():
args = dict()
try:
values = cgi.parse()
except Exception as e:
raise RuntimeError(str(e))
for k in ['device', 'type', 'incr']:
arg = values.get(k, [])
if len(arg) != 1:
raise RuntimeError("No %s specified" % (k))
args[k] = arg[0].lower()
return args
def filename_properties(pathname):
filename = os.path.basename(pathname)
if not filename.endswith('.zip'):
raise ValueError("Not an OTA file")
fields = filename[:-4].split('-')
if len(fields) != 5:
raise ValueError("Incorrect number of fields in filename")
if fields[1] != 'ota':
raise ValueError("Not an OTA file")
(project, device) = fields[0].split('_', 1)
incr_fields = fields[4].split('_')
if len(incr_fields) > 2:
raise ValueError("Incorrect incremental value")
props = dict()
props['project'] = project
props['device'] = device
props['version'] = fields[2]
props['buildtype'] = fields[3]
if len(incr_fields) == 1:
props['incremental'] = incr_fields[0]
else:
props['incrbase'] = incr_fields[0]
props['incremental'] = incr_fields[1]
return props
def incremental_stamp(incremental):
(tag, user, datestamp, timestamp) = incremental.split('.')
return "%s.%s" % (datestamp, timestamp)
def cache_refresh(cache):
stale = False
topdir = config['directory']
for root, dirs, files in os.walk(topdir):
relpath = ''
if root != topdir:
relpath = "%s" % (root[len(topdir):])
for filename in files:
pathname = "%s/%s" % (root, filename)
try:
props = filename_properties(pathname)
st = os.stat(pathname)
except Exception:
continue
if (not pathname in cache or
st.st_size != cache[pathname]['size'] or
int(st.st_mtime) != cache[pathname]['datetime']):
obj = dict()
obj['datetime'] = int(st.st_mtime)
obj['filename'] = filename
obj['romtype'] = props['buildtype']
obj['size'] = st.st_size
obj['url'] = "%s%s/%s" % (config['baseurl'], relpath, filename)
obj['version'] = props['version']
str = "%s.%d.%d" % (obj['filename'], obj['datetime'], obj['size'])
hasher = hashlib.sha1()
hasher.update(str.encode())
obj['id'] = hasher.hexdigest()
cache[pathname] = obj
stale = True
for k in list(cache):
if not os.path.exists(k):
del cache[k]
stale = True
return stale
def find_roms(device=None, buildtype=None, incremental=None):
cache = dict()
cf = None
try:
cn = "%s/.cache" % (config['directory'])
ct = os.stat(cn).st_mtime
cf = open(cn, 'r+')
fcntl.lockf(cf, fcntl.LOCK_EX)
except:
dbg("Failed to open cache file")
pass
update_cache = False
try:
cache = json.load(cf)
except Exception as e:
dbg("Failed to read cache file: %s" % (e))
update_cache = True
if cf is None or ct < time.time() - 60:
update_cache = cache_refresh(cache)
if cf is not None:
if update_cache:
cf.seek(0, 0)
try:
json.dump(cache, cf)
cf.truncate()
except:
pass
fcntl.lockf(cf, fcntl.LOCK_UN)
cf.close()
cf = None
try:
req_incr_stamp = incremental_stamp(incremental)
except Exception as e:
raise RuntimeError("Failed to parse request incremental field: %s" % (e))
roms = []
for k, v in cache.items():
try:
props = filename_properties(k)
except Exception as e:
dbg("Failed to parse cache filename: %s" % (e))
continue
if device and props['device'] != device:
continue
if buildtype and props['buildtype'] != buildtype:
continue
try:
ota_incr_stamp = incremental_stamp(props['incremental'])
except ValueError:
dbg("Failed to parse cache incremental field")
continue
if 'incrbase' in props:
# Incremental
if incremental != props['incrbase']:
continue
if ota_incr_stamp <= req_incr_stamp:
continue
roms.append(v)
return roms
### Begin main script ###
uid = os.getuid()
pwent = pwd.getpwuid(uid)
home = pwent[5]
config = dict()
try:
config_load("%s/.lineageupdaterrc" % (home))
except Exception as e:
die("Failed to load configuration: %s" % (str(e)))
try:
args = args_parse()
except Exception as e:
die("Failed to parse arguments: %s\n" % (str(e)))
if cgi_method() != "GET":
die("Invalid method")
try:
obj = dict()
obj['response'] = find_roms(args['device'], args['type'], args['incr'])
write_response(obj)
except BaseException as e:
die("Failed: %s" % (str(e)))