From 62ccf697001303ac8b5daea8af676cdc0bf30987 Mon Sep 17 00:00:00 2001 From: fly Date: Wed, 3 Mar 2021 19:51:58 +0000 Subject: [PATCH] Added API support Signed-off-by: fly --- common.py | 56 +++++++++++++++++++++++++++++++++++++++++++- worldbuild-runner.py | 25 ++++++-------------- worldbuild-worker.py | 31 ++++++++++++++++++++---- 3 files changed, 89 insertions(+), 23 deletions(-) diff --git a/common.py b/common.py index d68e683..fc0c209 100644 --- a/common.py +++ b/common.py @@ -19,6 +19,7 @@ import math import sys import socket import re +import requests from time import sleep # Adds leading 0s @@ -114,6 +115,20 @@ def send_status(name, status, host, port): print("Unable to send status " + status + " for tile " + name + ". Aborting...") sys.exit(1) +# Sends status to the manager api +def api_send_status(name, status, api, api_token): + try: + success = False + while not success: + response = requests.post(api, data={'auth': token, 'action': 'set', 'tile': name}, headers={"lAccept": "application/json", "Content-Type": "application/x-www-form-urlencoded"}) + success = response.json()["success"] + if not success: + print("Warning: Unable to send status " + status + " for tile " + name + ". Trying again in 60 seconds") + sleep(60) + except IOError: + print("Unable to send status " + status + " for tile " + name + ". Aborting...") + sys.exit(1) + # Gets new job from manager def get_job(action, host, port, none_exit=True): try: @@ -140,6 +155,27 @@ def get_job(action, host, port, none_exit=True): ret = get_job(action, host, port, none_exit) return ret +# Gets new job from manager api +def api_get_job(action, api, token, none_exit=True): + try: + response = requests.post(api, data={'auth': token, 'action': 'get-job', 'additional-type': action}, headers={"lAccept": "application/json", "Content-Type": "application/x-www-form-urlencoded"}) + match = re.match(r"[ew]\d{3}[ns]\d{2}|[0-9]{1,7}|None", responce.json()["job"]) + if match != None: + ret = match.group(0) + if ret == "None" and none_exit: + print("No job got asigned. Exiting...") + sys.exit(0) + else: + print("Recived invalid job. Retrying in 10 seconds...") + sleep(10) + ret = api_get_job(action, api, token, none_exit) + return ret + except IOError: + print("Unable to get job. Retrying in 10 seconds...") + sleep(10) + ret = get_job(action, host, port, none_exit) + return ret + # Gets status of a tile def get_status(name, host, port): try: @@ -156,5 +192,23 @@ def get_status(name, host, port): print("ERROR: Recived invalid state for tile " + name) sys.exit(1) except IOError: - print("ERROR: Unable to send status.") + print("ERROR: Unable to get status.") + sys.exit(1) + +# Gets status of a tile from api +def api_get_status(name, api, api_token): + try: + match = re.match(r"[ew]\d{3}[ns]\d{2}", name) + if match != None: + response = requests.post(api, data={'auth': token, 'action': 'status', 'area': name}, headers={"lAccept": "application/json", "Content-Type": "application/x-www-form-urlencoded"}) + else: + response = requests.post(api, data={'auth': token, 'action': 'status', 'tile': name}, headers={"lAccept": "application/json", "Content-Type": "application/x-www-form-urlencoded"}) + match = re.match(r"pending|done|rebuild|skip|started|packaged", response.json()["status"]) + if match != None: + return match.group(0) + else: + print("ERROR: Recived invalid state for " + name) + sys.exit(1) + except IOError: + print("ERROR: Unable to get status.") sys.exit(1) diff --git a/worldbuild-runner.py b/worldbuild-runner.py index 35f8e2e..eaceeb2 100755 --- a/worldbuild-runner.py +++ b/worldbuild-runner.py @@ -22,30 +22,22 @@ import threading import re import sys -prefix = "" -global_db = False +worker_cmd = ["./scripts/worldbuild-worker.py"] argc = len(sys.argv) i = 1 first = 1 while i < argc: - if sys.argv[i] == "-p" or sys.argv[i] == "--prefix": - i += 1 - prefix = sys.argv[i] - elif sys.argv[i] == "-g" or sys.argv[i] == "--global-db": - global_db = True - elif sys.argv[i] == "-h" or sys.argv[i] == "--help": - print("usage: worldbuild-runner.py [OPTIONS]") + if sys.argv[i] == "-h" or sys.argv[i] == "--help": + print("usage: worldbuild-runner.py [OPTIONS] [WORKER-OPTIONS]") print("Starts workers") print("") - print(" -p, --prefix Database prefix to use") - print(" -g, --global-db Use global database") - # TODO hand through all worker options print(" -h, --help Shows this help and exit") + print("") + print("All other options get passed through to the worker") sys.exit(0) else: - print("Unknown option " + sys.argv[i]) - sys.exit(1) + worker_cmd.append(sys.argv[i]) i += 1 class launcher(threading.Thread): @@ -98,10 +90,7 @@ class launcher(threading.Thread): self.run_max = count def start_worker(self): - cmd = ["./scripts/worldbuild-worker.py", "-q", "-p", prefix] - if global_db: - cmd.append("-g") - self.running.append(Popen(cmd, start_new_session=True)) + self.running.append(Popen(worker_cmd, start_new_session=True)) self.run_count += 1 diff --git a/worldbuild-worker.py b/worldbuild-worker.py index 5cfbfee..78ff867 100755 --- a/worldbuild-worker.py +++ b/worldbuild-worker.py @@ -22,10 +22,12 @@ import socket import re from time import sleep, strftime, time -from common import send_status, get_job, norm, get_south, get_west, get_east, get_north, get_area_name +from common import send_status, get_job, norm, get_south, get_west, get_east, get_north, get_area_name, api_send_status, api_get_job action = "pending" host = socket.gethostname() +api = None +api_token = None port = 12345 prefix = "" quiet = False @@ -41,6 +43,12 @@ while i < argc: elif sys.argv[i] == "--host": i += 1 host = sys.argv[i] + elif sys.argv[i] == "-a" or sys.argv[i] == "--api": + i += 1 + api = sys.argv[i] + elif sys.argv[i] == "-t" or sys.argv[i] == "--api-token": + i += 1 + api_token = sys.argv[i] elif sys.argv[i] == "-p" or sys.argv[i] == "--prefix": i += 1 prefix = sys.argv[i] @@ -63,6 +71,8 @@ while i < argc: print(" -g, --global-db Use global database") print(" --host Manager host") print(" --port Manager port") + print(" -a, --api Manager api url") + print(" -t, --api-token Manager api token") print(" -q, --quiet Don't print messages") print(" -a, --action Considered tiles for build. Can be:") print(" pending: Only builds pending tiles. These are always build: default") @@ -76,6 +86,10 @@ while i < argc: sys.exit(1) i += 1 +if api != None and api_token == None: + print("Error: API given but no token") + sys.exit(1) + def cleanup(): if os.path.isfile("projects/worldbuild-" + name + "/osm2city-exceptions.log"): run("mv projects/worldbuild-" + name + "/osm2city-exceptions.log projects/worldbuild/output/error/" + name + "-" + strftime("%Y%m%d-%H%M") + ".exceptions.log", shell=True) @@ -85,7 +99,10 @@ def cleanup(): running = True while running: - name = get_job(action, host, port) + if api != None: + name = api_get_job(action, api, api_token) + else: + name = get_job(action, host, port) try: run("mkdir -p projects/worldbuild-" + name, shell=True) @@ -130,7 +147,10 @@ while running: cleanup() - send_status(name, "done", host, port) + if api != None: + api_send_status(name, "done", api, api_token) + else: + send_status(name, "done", host, port) except KeyboardInterrupt: if not quiet: print("Graceful shutdown triggered. To force immedate stop, press Ctrl+C again") @@ -138,7 +158,10 @@ while running: try: build.wait() cleanup() - send_status(name, "done", host, port) + if api != None: + api_send_status(name, "done", api, api_token) + else: + send_status(name, "done", host, port) except KeyboardInterrupt: #TODO doesn't work print("Forcing shutdown...")