flight-gear-copilot/__init__.py
merspieler 28bc28675c Added bunch of checklists, A3XX only right now
Signed-off-by: merspieler <merspieler@users.noreply.github.com>
2018-02-02 16:45:52 +01:00

654 lines
17 KiB
Python

import sys
import re
import json
from time import sleep
from telnetlib import Telnet
from adapt.intent import IntentBuilder
from mycroft import MycroftSkill, intent_handler
from mycroft.skills.core import MycroftSkill
from mycroft.util.log import getLogger
LOGGER = getLogger(__name__)
class FlightGearCopilotSkill(MycroftSkill):
def __init__(self):
super(FlightGearCopilotSkill, self).__init__()
self.settings['host'] = "localhost"
self.settings['port'] = 8081
# TODO add self.settings['profiles'] with default profiles (A32X and c172p)
# DEFINITION of the settings['profiles'] structure
# [
# {
# "name": "<profile-name>",
# "acid":
# [
# "<Aircraft-id> can be found in /???",
# ...
# ],
# flaps:
# [
# {
# "id": "<flaps-name> can be up|down|full|number",
# "min-spd": "<minimum speed for save retraction>",
# "max-spd": "<maximum speed for save extention>",
# "value": "<value in the prop tree>"
# },
# ...
# ]
# "flaps-path": "<path to current flaps-position>"
# "gear-retractable": "<true|false>"
# },
# ...
# ]
# might be useful
# make_active()
#################################################################
# #
# Actions #
# #
#################################################################
#########################
# #
# Flaps #
# #
#########################
@intent_handler(IntentBuilder('FlapsIntent').require('flaps'))
def handle_flaps_intent(self, message):
flaps_request = message.data['utterance']
if flaps_request == "flaps":
self.speak_dialog("no.setting")
sys.exit(0)
# extracting the flaps setting from the utterance
match = re.match(r'.*flaps.* (up|full|down|\d{1,2}).*', flaps_request, re.I)
if match == None:
self.speak_dialog("no.valid.flaps")
sys.exit(0)
flaps_request = match.group(1)
tn = self.connect()
# get acid
tn.write("get /sim/aircraft\r\n")
acid = tn.read_until("\n")
# read acid to know which profile to use
profile = None
for i_profiles in self.settings['profiles']:
for i_acid in i_profiles['acid']:
if str(i_acid) == str(acid):
profile = i_profiles
break
if profile != None:
break
# BYPASS THE PROFILE CHECK
# TODO REMOVE THIS BYPASS
profile = self.settings['profiles'][0]
# check if the profile was found
if profile == None:
# TODO when creation of profiles via voice is possible, add dialog how to
self.speak("Profile not found")
self.exit(tn)
# get kias
tn.write("get /velocities/airspeed-kt\r\n")
kias = float(tn.read_until("\n"))
# find the flaps value for the flaps id
o_flaps = None
for i_flaps in profile['flaps']:
if str(i_flaps['id']) == str(flaps_request):
o_flaps = i_flaps
break
# cheick if flaps setting is known
if o_flaps == None:
self.speak_dialog("flaps.setting.unknown")
self.exit(tn)
tn.write("get " + str(profile['flaps-path']) + "\r\n")
flaps = tn.read_until("\n")
# check if extend or retract flaps
# TODO add handling up|down|full is already set
if str(flaps_request) == "down" or str(flaps_request) == "full":
flaps_mov = "extend"
elif str(flaps_request) == "up":
flaps_mov = "retract"
else:
if int(flaps_request) > flaps:
flaps_mov = "extend"
elif int(flaps_request) < flaps:
flaps_mov = "retract"
else:
self.speak_dialog("keep.flaps")
self.exit(tn)
# get ground speed
tn.write("get /velocities/groundspeed-kt\r\n")
gs = float(tn.read_until("\n"))
# skip speed check is speed is <= 30
if gs > 30:
# check if speed is high/low enough for retraction/extention
if flaps_mov == "extend":
if o_flaps['max-spd'] < kias:
self.speak_dialog("spd.high")
self.exit(tn)
else:
self.speak("Speed checked.")
else:
if o_flaps['min-spd'] > kias:
self.speak_dialog("spd.low")
self.exit(tn)
else:
self.speak("Speed checked.")
# TODO set flaps in fg
self.speak("Flaps " + str(flaps_request))
tn.close
#########################
# #
# Gear #
# #
#########################
@intent_handler(IntentBuilder('GearUpIntent').require('gearup'))
def handle_gear_up_intent(self, message):
# TODO add connection to fg
# TODO read acid from fg
# read acid to know which profile to use
for i_profiles in self.settings['profiles']:
for i_acid in i_profiles['acid']:
if i_acid == acid:
profile = i_profiles
break
if profile != None:
break
if profile == None:
# TODO when creation of profiles via voice is possible, add dialog how to
self.speak("Profile not found")
self.exit(tn)
if profile['gear-retractable'] == true:
# TODO set gear up in fg
self.speak("Gear up")
else:
self.speak_dialog("gear.not.retractable")
@intent_handler(IntentBuilder('GearDownIntent').require('geardown'))
def handle_gear_down_intent(self, message):
# TODO add connection to fg
# TODO read acid from fg
# read acid to know which profile to use
for i_profiles in self.settings['profiles']:
for i_acid in i_profiles['acid']:
if i_acid == acid:
profile = i_profiles
break
if profile != None:
break
if profile == None:
# TODO when creation of profiles via voice is possible, add dialog how to
self.speak("Profile not found")
self.exit(tn)
if profile['gear-retractable'] == true:
# TODO set gear down in fg
self.speak("Gear down")
else:
self.speak_dialog("gear.not.retractable")
#################################################################
# #
# Checklists #
# #
#################################################################
# TODO add all possible checklist
# TODO make it possible, to play a .mp3 file instead of tts
#################################
# #
# Before Start Check #
# #
#################################
@intent_handler(IntentBuilder('BeforeStartCheckIntent').require('before.start.check'))
def handle_before_start_check_intent(self, message):
# TODO make checklist plane specific
response = self.get_response("check.before.start.cockpit.preparation")
if response == None:
self.speak("Checklist not completed")
sys.exit(0)
match = re.search(r'completed', response, re.I)
if match == None:
self.speak("Checklist not completed")
sys.exit(0)
response = self.get_response("check.before.start.gear.pin")
if response == None:
self.speak("Checklist not completed")
sys.exit(0)
match = re.search(r'removed', response, re.I)
if match == None:
self.speak("Checklist not completed")
sys.exit(0)
response = self.get_response("check.before.start.signs")
if response == None:
self.speak("Checklist not completed")
sys.exit(0)
match = re.search(r'on', response, re.I)
if match == None:
self.speak("Checklist not completed")
sys.exit(0)
response = self.get_response("check.before.start.adirs")
if response == None:
self.speak("Checklist not completed")
sys.exit(0)
match = re.search(r'nav', response, re.I)
if match == None:
self.speak("Checklist not completed")
sys.exit(0)
response = self.get_response("check.before.start.fuel.quantity")
if response == None:
self.speak("Checklist not completed")
sys.exit(0)
match = re.search(r'check|checked', response, re.I)
if match == None:
self.speak("Checklist not completed")
sys.exit(0)
response = self.get_response("check.before.start.to.data")
if response == None:
self.speak("Checklist not completed")
sys.exit(0)
match = re.search(r'set', response, re.I)
if match == None:
self.speak("Checklist not completed")
sys.exit(0)
response = self.get_response("check.general.baro.ref")
if response == None:
self.speak("Checklist not completed")
sys.exit(0)
match = re.search(r'set', response, re.I)
if match == None:
self.speak("Checklist not completed")
sys.exit(0)
response = self.get_response("check.before.start.window")
if response == None:
self.speak("Checklist not completed")
sys.exit(0)
match = re.search(r'close|closed', response, re.I)
if match == None:
self.speak("Checklist not completed")
sys.exit(0)
response = self.get_response("check.before.start.beacon")
if response == None:
self.speak("Checklist not completed")
sys.exit(0)
match = re.search(r'on', response, re.I)
if match == None:
self.speak("Checklist not completed")
sys.exit(0)
response = self.get_response("check.before.start.thr.lever")
if response == None:
self.speak("Checklist not completed")
sys.exit(0)
match = re.search(r'idle', response, re.I)
if match == None:
self.speak("Checklist not completed")
sys.exit(0)
response = self.get_response("check.general.parking.break")
if response == None:
self.speak("Checklist not completed")
sys.exit(0)
match = re.search(r'on|off|set', response, re.I)
if match == None:
self.speak("Checklist not completed")
sys.exit(0)
self.speak("Before start checklist completed")
#################################
# #
# After Start Check #
# #
#################################
@intent_handler(IntentBuilder('AfterStartCheckIntent').require('after.start.check'))
def handle_after_start_check_intent(self, message):
# TODO make checklist plane specific
response = self.get_response("check.after.start.anti.ice")
if response == None:
self.speak("Checklist not completed")
sys.exit(0)
match = re.search(r'on|off', response, re.I)
if match == None:
self.speak("Checklist not completed")
sys.exit(0)
response = self.get_response("check.general.ecam.status")
if response == None:
self.speak("Checklist not completed")
sys.exit(0)
match = re.search(r'checked|check', response, re.I)
if match == None:
self.speak("Checklist not completed")
sys.exit(0)
response = self.get_response("check.after.start.pitch.trim")
if response == None:
self.speak("Checklist not completed")
sys.exit(0)
match = re.search(r'set', response, re.I)
if match == None:
self.speak("Checklist not completed")
sys.exit(0)
response = self.get_response("check.after.start.rudder.trim")
if response == None:
self.speak("Checklist not completed")
sys.exit(0)
match = re.search(r'0|zero', response, re.I)
if match == None:
self.speak("Checklist not completed")
sys.exit(0)
self.speak("After start checklist completed")
#########################
# #
# Taxi Check #
# #
#########################
@intent_handler(IntentBuilder('TaxiCheckIntent').require('taxi.check'))
def handle_taxi_check_intent(self, message):
# TODO make checklist plane specific
self.speak("Flight controls checked")
sleep(4)
self.speak("Flight instruments checked")
sleep(4)
response = self.get_response("check.general.briefing")
if response == None:
self.speak("Checklist not completed")
sys.exit(0)
match = re.search(r'confirmed', response, re.I)
if match == None:
self.speak("Checklist not completed")
sys.exit(0)
response = self.get_response("check.taxi.flaps.settings")
if response == None:
self.speak("Checklist not completed")
sys.exit(0)
match = re.search(r'set', response, re.I)
if match == None:
self.speak("Checklist not completed")
sys.exit(0)
response = self.get_response("check.taxi.v.spd")
if response == None:
self.speak("Checklist not completed")
sys.exit(0)
match = re.search(r'set', response, re.I)
if match == None:
self.speak("Checklist not completed")
sys.exit(0)
response = self.get_response("check.taxi.atc")
if response == None:
self.speak("Checklist not completed")
sys.exit(0)
match = re.search(r'set', response, re.I)
if match == None:
self.speak("Checklist not completed")
sys.exit(0)
response = self.get_response("check.taxi.to.no.blue")
if response == None:
self.speak("Checklist not completed")
sys.exit(0)
match = re.search(r'no blue|all green', response, re.I)
if match == None:
self.speak("Checklist not completed")
sys.exit(0)
response = self.get_response("check.taxi.to.rwy")
if response == None:
self.speak("Checklist not completed")
sys.exit(0)
match = re.search(r'confirmed', response, re.I)
if match == None:
self.speak("Checklist not completed")
sys.exit(0)
response = self.get_response("check.taxi.cabin.crew")
if response == None:
self.speak("Checklist not completed")
sys.exit(0)
match = re.search(r'confirmed', response, re.I)
if match == None:
self.speak("Checklist not completed")
sys.exit(0)
response = self.get_response("check.taxi.tcas")
if response == None:
self.speak("Checklist not completed")
sys.exit(0)
match = re.search(r'TA|RA|on', response, re.I)
if match == None:
self.speak("Checklist not completed")
sys.exit(0)
response = self.get_response("check.general.eng.mode.sel")
if response == None:
self.speak("Checklist not completed")
sys.exit(0)
match = re.search(r'on|off|norm|normal|start', response, re.I)
if match == None:
self.speak("Checklist not completed")
sys.exit(0)
response = self.get_response("check.general.packs")
if response == None:
self.speak("Checklist not completed")
sys.exit(0)
match = re.search(r'on|off|packs?', response, re.I)
if match == None:
self.speak("Checklist not completed")
sys.exit(0)
self.speak("Before start checklist completed")
#########################
# #
# Climb Check #
# #
#########################
@intent_handler(IntentBuilder('ClimbCheckIntent').require('climb.check'))
def handle_climb_check_intent(self, message):
# TODO make checklist plane specific
response = self.get_response("check.climb.gear.up")
if response == None:
self.speak("Checklist not completed")
sys.exit(0)
match = re.search(r'up|retracted', response, re.I)
if match == None:
self.speak("Checklist not completed")
sys.exit(0)
response = self.get_response("check.general.flaps")
if response == None:
self.speak("Checklist not completed")
sys.exit(0)
match = re.search(r'up|retracted|0', response, re.I)
if match == None:
self.speak("Checklist not completed")
sys.exit(0)
response = self.get_response("check.general.packs")
if response == None:
self.speak("Checklist not completed")
sys.exit(0)
match = re.search(r'on', response, re.I)
if match == None:
self.speak("Checklist not completed")
sys.exit(0)
response = self.get_response("check.general.baro.ref")
if response == None:
self.speak("Checklist not completed")
sys.exit(0)
match = re.search(r'set', response, re.I)
if match == None:
self.speak("Checklist not completed")
sys.exit(0)
self.speak("After take off checklist completed")
#################################
# #
# Approach Check #
# #
#################################
@intent_handler(IntentBuilder('ApprCheckIntent').require('appr.check'))
def handle_appr_check_intent(self, message):
# TODO make checklist plane specific
response = self.get_response("check.general.briefing")
if response == None:
self.speak("Checklist not completed")
sys.exit(0)
match = re.search(r'confirmed', response, re.I)
if match == None:
self.speak("Checklist not completed")
sys.exit(0)
response = self.get_response("check.general.ecam.status")
if response == None:
self.speak("Checklist not completed")
sys.exit(0)
match = re.search(r'checked|check', response, re.I)
if match == None:
self.speak("Checklist not completed")
sys.exit(0)
response = self.get_response("check.appr.seat.belts")
if response == None:
self.speak("Checklist not completed")
sys.exit(0)
match = re.search(r'on', response, re.I)
if match == None:
self.speak("Checklist not completed")
sys.exit(0)
response = self.get_response("check.general.baro.ref")
if response == None:
self.speak("Checklist not completed")
sys.exit(0)
match = re.search(r'set', response, re.I)
if match == None:
self.speak("Checklist not completed")
sys.exit(0)
response = self.get_response("check.appr.min")
if response == None:
self.speak("Checklist not completed")
sys.exit(0)
match = re.search(r'set', response, re.I)
if match == None:
self.speak("Checklist not completed")
sys.exit(0)
response = self.get_response("check.general.eng.mode.sel")
if response == None:
self.speak("Checklist not completed")
sys.exit(0)
match = re.search(r'on|off|norm|normal|start', response, re.I)
if match == None:
self.speak("Checklist not completed")
sys.exit(0)
self.speak("Approach checklist completed")
#########################
# #
# LDG Check #
# #
#########################
@intent_handler(IntentBuilder('LDGCheckIntent').require('ldg.check'))
def handle_ldg_check_intent(self, message):
# TODO make checklist plane specific
response = self.get_response("check.ldg.no.blue")
if response == None:
self.speak("Checklist not completed")
sys.exit(0)
match = re.search(r'no blue|all green', response, re.I)
if match == None:
self.speak("Checklist not completed")
sys.exit(0)
self.speak("Landing checklist completed")
# TODO add flt/ctl check
#################################################################
# #
# Help functions #
# #
#################################################################
# connect to fg
def connect(self):
try:
tn = Telnet(self.settings['host'], self.settings['port'])
except:
self.speak_dialog("no.telnet.con")
sys.exit(0)
# switch to data mode
tn.write("data\r\n")
return tn
# exit routine to properly close the tn con
def exit(self, tn):
tn.close
sys.exit(0)
def stop(self):
pass
def create_skill():
return FlightGearCopilotSkill()