diff --git a/src/Airports/GenAirports850/CMakeLists.txt b/src/Airports/GenAirports850/CMakeLists.txt index 909e36d8..f19daee7 100644 --- a/src/Airports/GenAirports850/CMakeLists.txt +++ b/src/Airports/GenAirports850/CMakeLists.txt @@ -17,6 +17,7 @@ add_executable(genapts850 runway.cxx runway.hxx rwy_simple.cxx rwy_gen.cxx + scheduler.cxx scheduler.hxx taxiway.cxx taxiway.hxx ) @@ -24,6 +25,8 @@ target_link_libraries(genapts850 Polygon Geometry Array Optimize Output poly2tri TriangleJRS + PocoFoundation + PocoNet ${GDAL_LIBRARY} ${SIMGEAR_CORE_LIBRARIES} ${SIMGEAR_CORE_LIBRARY_DEPENDENCIES} diff --git a/src/Airports/GenAirports850/airport.cxx b/src/Airports/GenAirports850/airport.cxx index 70d39e6d..ccb86031 100644 --- a/src/Airports/GenAirports850/airport.cxx +++ b/src/Airports/GenAirports850/airport.cxx @@ -91,6 +91,7 @@ Airport::Airport( int c, char* def) dbg_pvmt_poly = 0; dbg_feat_poly = 0; dbg_base_poly = 0; + dbg_taxi_poly = 0; SG_LOG( SG_GENERAL, SG_DEBUG, "Created airport with icao " << icao << ", control tower " << ct << ", and description " << description ); @@ -123,6 +124,11 @@ Airport::~Airport() delete pavements[i]; } + for (unsigned int i=0; i<taxiways.size(); i++) + { + delete taxiways[i]; + } + for (unsigned int i=0; i<lightobjects.size(); i++) { delete lightobjects[i]; @@ -525,8 +531,6 @@ void Airport::merge_slivers( superpoly_list& polys, poly_list& slivers_list ) { } } - - void Airport::BuildBtg(const string& root, const string_list& elev_src ) { ClipPolyType accum; @@ -600,6 +604,8 @@ void Airport::BuildBtg(const string& root, const string_list& elev_src ) } } + SG_LOG(SG_GENERAL, SG_INFO, "Parse Complete - Runways: " << runways.size() << " Pavements: " << pavements.size() << " Features: " << features.size() << " Taxiways: " << taxiways.size() ); + // Starting to clip the polys (for now - only UNIX builds) build_start.stamp(); @@ -1803,5 +1809,5 @@ void Airport::BuildBtg(const string& root, const string_list& elev_src ) tgChopNormalPolygon( holepath, "Hole", divided_base, true ); tgChopNormalPolygon( holepath, "Airport", apt_clearing, false ); - SG_LOG( SG_GENERAL, SG_ALERT, SGLOG_GREEN << "\nSUCCESS generating " << icao << SGLOG_NORMAL << "\n" ); + //SG_LOG( SG_GENERAL, SG_ALERT, SGLOG_GREEN << "\nSUCCESS generating " << icao << SGLOG_NORMAL << "\n" ); } diff --git a/src/Airports/GenAirports850/airport.hxx b/src/Airports/GenAirports850/airport.hxx index bf7d92fc..f5ed48a1 100644 --- a/src/Airports/GenAirports850/airport.hxx +++ b/src/Airports/GenAirports850/airport.hxx @@ -114,6 +114,7 @@ public: void BuildBtg( const string& root, const string_list& elev_src ); void SetDebugPolys( int rwy, int taxi, int pvmt, int feat, int base ); + void DumpStats( void ); private: int code; // airport, heliport or sea port diff --git a/src/Airports/GenAirports850/main.cxx b/src/Airports/GenAirports850/main.cxx index d4104eef..31f7c187 100644 --- a/src/Airports/GenAirports850/main.cxx +++ b/src/Airports/GenAirports850/main.cxx @@ -16,6 +16,12 @@ #include <string.h> #include <iostream> +#include <Poco/Environment.h> +#include <Poco/Net/SocketAddress.h> +#include <Poco/Net/Socket.h> +#include <Poco/Net/StreamSocket.h> +#include <Poco/Net/SocketStream.h> + #include <simgear/debug/logstream.hxx> #include <simgear/misc/sg_path.hxx> #include <simgear/misc/sgstream.hxx> @@ -28,8 +34,10 @@ #include "closedpoly.hxx" #include "linearfeature.hxx" #include "parser.hxx" +#include "scheduler.hxx" using namespace std; +using namespace Poco; // TODO : Modularize this function @@ -113,6 +121,11 @@ int nudge = 10; double slope_max = 0.2; double gSnap = 0.00000001; // approx 1 mm +// For creating a buffered stream to write to the socket +Net::StreamSocket ss; +Net::SocketStreamBuf ssb( ss ); +ostream os(&ssb); + int main(int argc, char **argv) { float min_lon = -180; @@ -121,6 +134,7 @@ int main(int argc, char **argv) float max_lat = 90; long position = 0; + // Setup elevation directories string_list elev_src; elev_src.clear(); setup_default_elevation_sources(elev_src); @@ -128,23 +142,24 @@ int main(int argc, char **argv) // Set Normal logging sglog().setLogLevels( SG_GENERAL, SG_INFO ); - SG_LOG(SG_GENERAL, SG_INFO, "Run genapt"); - // parse arguments string work_dir = ""; string input_file = ""; + string summary_file = "./genapt850.csv"; string start_id = ""; string restart_id = ""; string airport_id = ""; + long airport_pos = -1; string last_apt_file = "./last_apt.txt"; int dump_rwy_poly = -1; int dump_taxi_poly = -1; int dump_pvmt_poly = -1; int dump_feat_poly = -1; int dump_base_poly = -1; + int num_threads = Poco::Environment::processorCount(); + int redirect_port = -1; int arg_pos; - for (arg_pos = 1; arg_pos < argc; arg_pos++) { string arg = argv[arg_pos]; @@ -163,7 +178,11 @@ int main(int argc, char **argv) else if ( arg.find("--start-id=") == 0 ) { start_id = arg.substr(11); - } + } + else if ( arg.find("--airport-pos=") == 0 ) + { + airport_pos = atol( arg.substr(14).c_str() ); + } else if ( arg.find("--restart-id=") == 0 ) { restart_id = arg.substr(13); @@ -228,6 +247,10 @@ int main(int argc, char **argv) { sglog().setLogLevels( SG_GENERAL, SG_BULK ); } + else if ( arg.find("--redirect-port=") == 0 ) + { + redirect_port = atoi( arg.substr(16).c_str() ); + } else if ( (arg.find("--max-slope=") == 0) ) { slope_max = atof( arg.substr(12).c_str() ); @@ -264,6 +287,7 @@ int main(int argc, char **argv) } } + SG_LOG(SG_GENERAL, SG_INFO, "Run genapt with " << num_threads << " threads" ); SG_LOG(SG_GENERAL, SG_INFO, "Input file = " << input_file); SG_LOG(SG_GENERAL, SG_INFO, "Terrain sources = "); for ( unsigned int i = 0; i < elev_src.size(); ++i ) @@ -304,6 +328,7 @@ int main(int argc, char **argv) sgp.append( "dummy" ); sgp.create_dir( 0755 ); + string command = argv[0]; string lastaptfile = work_dir+"/last_apt"; // initialize persistant polygon counter @@ -320,63 +345,69 @@ int main(int argc, char **argv) exit(-1); } - // Create the parser... - Parser* parser = new Parser(input_file, work_dir, elev_src); + // Create the scheduler + Scheduler* scheduler = new Scheduler(command, input_file, work_dir, elev_src); // Add any debug - parser->SetDebugPolys( dump_rwy_poly, dump_taxi_poly, dump_pvmt_poly, dump_feat_poly, dump_base_poly ); + // TODO : parser->SetDebugPolys( dump_rwy_poly, dump_taxi_poly, dump_pvmt_poly, dump_feat_poly, dump_base_poly ); + + // check for output redirect + if ( redirect_port >= 0 ) { + + // create a stream socket back to the main process + Net::SocketAddress sa( "localhost", redirect_port ); + ss.connect(sa); + + // then a buffered stream to write to the socket + os.rdbuf(&ssb); + + // then hook up SG_LOG to the stream buf + sglog().set_output( os ); + } else { + // this is the main program - + SG_LOG(SG_GENERAL, SG_INFO, "Launch command was " << argv[0] ); + } // just one airport if ( airport_id != "" ) { // just find and add the one airport - parser->AddAirport( airport_id ); + scheduler->AddAirport( airport_id ); SG_LOG(SG_GENERAL, SG_INFO, "Finished Adding airport - now parse"); - // and start the parser - parser->Parse( last_apt_file ); + // and schedule parsers + scheduler->Schedule( num_threads, summary_file ); + } + // We are given an airport position from a main scheduler - parse this + else if ( airport_pos != -1 ) + { + // create and start the real parser + Parser parser(input_file, work_dir, elev_src); + parser.Parse( airport_pos ); } else if ( start_id != "" ) { SG_LOG(SG_GENERAL, SG_INFO, "move forward to " << start_id ); // scroll forward in datafile - position = parser->FindAirport( start_id ); + position = scheduler->FindAirport( start_id ); // add remaining airports within boundary - parser->AddAirports( position, min_lat, min_lon, max_lat, max_lon ); + scheduler->AddAirports( position, min_lat, min_lon, max_lat, max_lon ); // parse all the airports that were found - parser->Parse( last_apt_file ); - } - else if ( restart_id != "" ) - { - SG_LOG(SG_GENERAL, SG_INFO, "move forward airport after " << restart_id ); - - // scroll forward in datafile - position = parser->FindAirport( restart_id ); - - // add all remaining airports within boundary - parser->AddAirports( position, min_lat, min_lon, max_lat, max_lon ); - - // but remove the restart id - it's broken - parser->RemoveAirport( restart_id ); - - // parse all the airports that were found - parser->Parse( last_apt_file ); + scheduler->Schedule( num_threads, summary_file ); } else { // find all airports within given boundary - parser->AddAirports( 0, min_lat, min_lon, max_lat, max_lon ); + scheduler->AddAirports( 0, min_lat, min_lon, max_lat, max_lon ); // and parser them - parser->Parse( last_apt_file ); + scheduler->Schedule( num_threads, summary_file ); } - delete parser; - SG_LOG(SG_GENERAL, SG_INFO, "Done"); exit(0); diff --git a/src/Airports/GenAirports850/parser.cxx b/src/Airports/GenAirports850/parser.cxx index cfa00e04..ef4ccac2 100644 --- a/src/Airports/GenAirports850/parser.cxx +++ b/src/Airports/GenAirports850/parser.cxx @@ -1,13 +1,26 @@ #include <ctime> +#include <Poco/Mutex.h> +#include <Poco/Pipe.h> +#include <Poco/PipeStream.h> +#include <Poco/Process.h> +#include <Poco/Runnable.h> +#include <Poco/Semaphore.h> +#include <Poco/Thread.h> +#include <Poco/Timespan.h> +#include <Poco/Net/ServerSocket.h> +#include <Poco/Net/SocketAddress.h> +#include <Poco/Net/SocketStream.h> +#include <Poco/Net/Socket.h> +#include <Poco/Net/StreamSocket.h> + #include <simgear/debug/logstream.hxx> #include <simgear/misc/sgstream.hxx> #include <simgear/timing/timestamp.hxx> #include "parser.hxx" - -bool Parser::IsAirportDefinition( char* line, string icao ) +bool Parser::GetAirportDefinition( char* line, string& icao ) { char* tok; int code; @@ -28,38 +41,11 @@ bool Parser::IsAirportDefinition( char* line, string icao ) case SEA_AIRPORT_CODE: case HELIPORT_CODE: airport = new Airport( code, line ); - if ( airport->GetIcao() == icao ) - { - match = true; - } + icao = airport->GetIcao(); + match = true; break; - case LAND_RUNWAY_CODE: - case WATER_RUNWAY_CODE: - case HELIPAD_CODE: - case PAVEMENT_CODE: - case LINEAR_FEATURE_CODE: - case BOUNDRY_CODE: - case NODE_CODE: - case BEZIER_NODE_CODE: - case CLOSE_NODE_CODE: - case CLOSE_BEZIER_NODE_CODE: - case TERM_NODE_CODE: - case TERM_BEZIER_NODE_CODE: - case AIRPORT_VIEWPOINT_CODE: - case AIRPLANE_STARTUP_LOCATION_CODE: - case LIGHT_BEACON_CODE: - case WINDSOCK_CODE: - case TAXIWAY_SIGN: - case LIGHTING_OBJECT: - case COMM_FREQ1_CODE: - case COMM_FREQ2_CODE: - case COMM_FREQ3_CODE: - case COMM_FREQ4_CODE: - case COMM_FREQ5_CODE: - case COMM_FREQ6_CODE: - case COMM_FREQ7_CODE: - case END_OF_FILE : + default: break; } } @@ -67,258 +53,6 @@ bool Parser::IsAirportDefinition( char* line, string icao ) return match; } -void Parser::AddAirport( string icao ) -{ - char line[2048]; - long cur_pos; - bool found = false; - - ifstream in( filename.c_str() ); - if ( !in.is_open() ) - { - SG_LOG( SG_GENERAL, SG_ALERT, "Cannot open file: " << filename ); - exit(-1); - } - - SG_LOG( SG_GENERAL, SG_INFO, "Adding airport " << icao << " to parse list"); - while ( !in.eof() && !found ) - { - // remember the position of this line - cur_pos = in.tellg(); - - // get a line - in.getline(line, 2048); - - // this is and airport definition - remember it - if ( IsAirportDefinition( line, icao ) ) - { - SG_LOG( SG_GENERAL, SG_DEBUG, "Found airport " << icao << " at " << cur_pos ); - parse_positions.push_back( cur_pos ); - airport_icaos.push_back( icao ); - found = true; - } - } -} - -long Parser::FindAirport( string icao ) -{ - char line[2048]; - long cur_pos = 0; - bool found = false; - - ifstream in( filename.c_str() ); - if ( !in.is_open() ) - { - SG_LOG( SG_GENERAL, SG_ALERT, "Cannot open file: " << filename ); - exit(-1); - } - - SG_LOG( SG_GENERAL, SG_DEBUG, "Finding airport " << icao ); - while ( !in.eof() && !found ) - { - // remember the position of this line - cur_pos = in.tellg(); - - // get a line - in.getline(line, 2048); - - // this is and airport definition - remember it - if ( IsAirportDefinition( line, icao ) ) - { - SG_LOG( SG_GENERAL, SG_DEBUG, "Found airport " << line << " at " << cur_pos ); - found = true; - } - } - - if (found) - { - return cur_pos; - } - else - { - return 0; - } -} - -void Parser::AddAirports( long start_pos, float min_lat, float min_lon, float max_lat, float max_lon ) -{ - char line[2048]; - char* def; - long cur_pos; - long cur_apt_pos; - string cur_apt_name; - char* tok; - int code; - bool match; - bool done; - - done = false; - match = false; - - // start from current position, and push all airports where a runway start or end - // lies within the given min/max coordinates - - ifstream in( filename.c_str() ); - if ( !in.is_open() ) - { - SG_LOG( SG_GENERAL, SG_ALERT, "Cannot open file: " << filename ); - exit(-1); - } - - if (start_pos) - { - in.seekg(start_pos, ios::beg); - } - - while (!done) - { - // remember the position of this line - cur_pos = in.tellg(); - - // get a line - in.getline(line, 2048); - def = &line[0]; - - // Get the number code - tok = strtok(def, " \t\r\n"); - - if (tok) - { - def += strlen(tok)+1; - code = atoi(tok); - - switch(code) - { - case LAND_AIRPORT_CODE: - case SEA_AIRPORT_CODE: - case HELIPORT_CODE: - { - Airport* airport = new Airport( code, def ); - if (match) - { - parse_positions.push_back( cur_apt_pos ); - airport_icaos.push_back( cur_apt_name ); - } - // remember this new apt pos and name, and clear match - cur_apt_pos = cur_pos; - cur_apt_name = airport->GetIcao(); - delete airport; - - match = false; - } - break; - - case END_OF_FILE: - if (match) - { - parse_positions.push_back( cur_apt_pos ); - airport_icaos.push_back( cur_apt_name ); - } - done = true; - break; - - case LAND_RUNWAY_CODE: - // if the the runway start / end coords are within the rect, - // we have a winner - { - Runway* runway = new Runway(def); - Point3D start = runway->GetStart(); - Point3D end = runway->GetEnd(); - if ( (start.x() >= min_lon ) && - (start.y() >= min_lat ) && - (start.x() <= max_lon ) && - (start.y() <= max_lat ) ) { - match = true; - } - else if ( (end.x() >= min_lon ) && - (end.y() >= min_lat ) && - (end.x() <= max_lon ) && - (end.y() <= max_lat ) ) { - match = true; - } - delete runway; - } - break; - - case WATER_RUNWAY_CODE: - // if the the runway start / end coords are within the rect, - // we have a winner - { - WaterRunway* runway = new WaterRunway(def); - Point3D start = runway->GetStart(); - Point3D end = runway->GetEnd(); - if ( (start.x() >= min_lon ) && - (start.y() >= min_lat ) && - (start.x() <= max_lon ) && - (start.y() <= max_lat ) ) { - match = true; - } - else if ( (end.x() >= min_lon ) && - (end.y() >= min_lat ) && - (end.x() <= max_lon ) && - (end.y() <= max_lat ) ) { - match = true; - } - delete runway; - } - break; - - case HELIPAD_CODE: - // if the heliport coords are within the rect, we have - // a winner - { - Helipad* helipad = new Helipad(def); - Point3D loc = helipad->GetLoc(); - if ( (loc.x() >= min_lon ) && - (loc.y() >= min_lat ) && - (loc.x() <= max_lon ) && - (loc.y() <= max_lat ) ) { - match = true; - } - delete helipad; - } - break; - - case TAXIWAY_CODE: - case PAVEMENT_CODE: - case LINEAR_FEATURE_CODE: - case BOUNDRY_CODE: - case NODE_CODE: - case BEZIER_NODE_CODE: - case CLOSE_NODE_CODE: - case CLOSE_BEZIER_NODE_CODE: - case TERM_NODE_CODE: - case TERM_BEZIER_NODE_CODE: - case AIRPORT_VIEWPOINT_CODE: - case AIRPLANE_STARTUP_LOCATION_CODE: - case LIGHT_BEACON_CODE: - case WINDSOCK_CODE: - case TAXIWAY_SIGN: - case LIGHTING_OBJECT: - case COMM_FREQ1_CODE: - case COMM_FREQ2_CODE: - case COMM_FREQ3_CODE: - case COMM_FREQ4_CODE: - case COMM_FREQ5_CODE: - case COMM_FREQ6_CODE: - case COMM_FREQ7_CODE: - break; - } - } - } -} - -void Parser::RemoveAirport( string icao ) -{ - for (unsigned int i = 0; i < airport_icaos.size(); i++ ) { - if (airport_icaos[i] == icao) { - parse_positions.erase(parse_positions.begin()+i); - airport_icaos.erase(airport_icaos.begin()+i); - break; - } - } -} - void Parser::SetDebugPolys( int rwy, int taxi, int pvmt, int feat, int base ) { rwy_poly = rwy; @@ -328,10 +62,11 @@ void Parser::SetDebugPolys( int rwy, int taxi, int pvmt, int feat, int base ) base_poly = base; } -void Parser::Parse( string last_apt_file ) +void Parser::Parse( long pos ) { - char tmp[2048]; - + char line[2048]; + string icao; + SGTimeStamp parse_start; SGTimeStamp parse_end; SGTimeStamp parse_time; @@ -346,31 +81,31 @@ void Parser::Parse( string last_apt_file ) SG_LOG( SG_GENERAL, SG_ALERT, "Cannot open file: " << filename ); exit(-1); } + in.seekg(pos, ios::beg); - // for each position in parse_positions, parse an airport - for ( unsigned int i=0; i < parse_positions.size(); i++) - { + // get a line + in.getline(line, 2048); + + // Verify this is and airport definition and get the icao + if( GetAirportDefinition( line, icao ) ) { + SG_LOG( SG_GENERAL, SG_INFO, "Found airport " << icao << " at " << pos ); + + // Start parse at pos SetState(STATE_NONE); in.clear(); parse_start.stamp(); log_time = time(0); SG_LOG( SG_GENERAL, SG_ALERT, "\n*******************************************************************" ); - SG_LOG( SG_GENERAL, SG_ALERT, "Start airport " << airport_icaos[i] << " at " << parse_positions[i] << ": start time " << ctime(&log_time) ); - - in.seekg(parse_positions[i], ios::beg); - - // save the airport we are working on - char command[256]; - sprintf( command, "echo %s > %s", airport_icaos[i].c_str(), last_apt_file.c_str() ); - system( command ); + SG_LOG( SG_GENERAL, SG_ALERT, "Start airport " << icao << " at " << pos << ": start time " << ctime(&log_time) ); + in.seekg(pos, ios::beg); while ( !in.eof() && (cur_state != STATE_DONE ) ) { - in.getline(tmp, 2048); + in.getline(line, 2048); // Parse the line - ParseLine(tmp); + ParseLine(line); } parse_end.stamp(); @@ -390,12 +125,11 @@ void Parser::Parse( string last_apt_file ) } log_time = time(0); - SG_LOG( SG_GENERAL, SG_ALERT, "Finished airport " << airport_icaos[i] << ": end time " << ctime(&log_time) ); - - SG_LOG( SG_GENERAL, SG_INFO, "Time to parse " << parse_time ); - SG_LOG( SG_GENERAL, SG_INFO, "Time to build " << build_time ); - SG_LOG( SG_GENERAL, SG_INFO, "Time to clean up " << clean_time ); - SG_LOG( SG_GENERAL, SG_INFO, "Time to triangulate " << triangulation_time ); + SG_LOG( SG_GENERAL, SG_ALERT, "Finished airport " << icao << + " : parse " << parse_time << " : build " << build_time << + " : clean " << clean_time << " : tesselate " << triangulation_time ); + } else { + SG_LOG( SG_GENERAL, SG_INFO, "Not an airport at pos " << pos << " line is: " << line ); } } diff --git a/src/Airports/GenAirports850/parser.hxx b/src/Airports/GenAirports850/parser.hxx index 7c265956..303c468b 100644 --- a/src/Airports/GenAirports850/parser.hxx +++ b/src/Airports/GenAirports850/parser.hxx @@ -58,10 +58,6 @@ using namespace std; - -typedef std::vector <long> ParseList; -typedef std::vector <string> IcaoList; - class Parser { public: @@ -94,14 +90,11 @@ public: } void SetDebugPolys( int rwy, int taxi, int pvmt, int feat, int base ); - long FindAirport( string icao ); - void AddAirport( string icao ); - void AddAirports( long start_pos, float min_lat, float min_lon, float max_lat, float max_lon ); - void RemoveAirport( string icao ); - void Parse( string last_apt_file ); - + void Parse( long pos ); + private: bool IsAirportDefinition( char* line, string icao ); + bool GetAirportDefinition( char* line, string& icao ); int SetState( int state ); @@ -133,10 +126,6 @@ private: Beacon* cur_beacon; Sign* cur_sign; - // List of positions in database file to parse - ParseList parse_positions; - IcaoList airport_icaos; - // debug int rwy_poly; int taxi_poly; diff --git a/src/Airports/GenAirports850/scheduler.cxx b/src/Airports/GenAirports850/scheduler.cxx new file mode 100644 index 00000000..6fdcb4db --- /dev/null +++ b/src/Airports/GenAirports850/scheduler.cxx @@ -0,0 +1,852 @@ +#include <cstring> + +#include <simgear/debug/logstream.hxx> +#include <simgear/misc/sgstream.hxx> + +#include "airport.hxx" +#include "parser.hxx" +#include "scheduler.hxx" + +extern double gSnap; + +/*** PROCESS INFO ***/ +ProcessInfo::ProcessInfo( AirportInfo* pai, const ProcessHandle ph, Net::StreamSocket s ) : procHandle(ph) +{ + pInfo = pai; + sock = s; + pssb = new Net::SocketStreamBuf( s ); + pin = new istream( pssb ); + state = P_STATE_INIT; + SetTimeout(); +} + +void ProcessInfo::SetTimeout( void ) +{ + SGTimeStamp now; + SGTimeStamp to; + + now.stamp(); + switch( state ) { + case P_STATE_INIT: + to.setTime(P_STATE_INIT_TIME, 0); + break; + + case P_STATE_PARSE: + to.setTime(P_STATE_PARSE_TIME, 0); + break; + + case P_STATE_BUILD: + to.setTime(P_STATE_BUILD_TIME, 0); + break; + + case P_STATE_TRIANGULATE: + to.setTime(P_STATE_TRIANGULATE_TIME, 0); + break; + + case P_STATE_OUTPUT: + to.setTime(P_STATE_OUTPUT_TIME, 0); + break; + } + + timeout = (now + to); +} + +void ProcessInfo::Kill( void ) +{ + SGTimeStamp now; + now.stamp(); + + // kill the process + Process::kill( procHandle.id() ); + + // wait for the zombie + procHandle.wait(); + + // mark process info - so we can reclaim it + state = P_STATE_KILLED; +} + +int ProcessInfo::HandleLine( void ) +{ + char line[256]; + + pin->getline( line, 256 ); + if ( pin->rdstate() != ifstream::goodbit ) { + SG_LOG( SG_GENERAL, SG_INFO, pInfo->GetIcao() << ": ProcessInfo::HandleLine read from socket error " << pin->rdstate() ); + + state = P_STATE_KILLED; + } else { + pInfo->SetErrorString( line ); + + // Print the line + SG_LOG( SG_GENERAL, SG_INFO, pInfo->GetIcao() << ": " << line); + + // Update state + if ( strstr( line, "Parse Complete " ) != NULL ) { + // Grab the stats + int rwys, pvmnts, feats, twys; + + sscanf(line, "Parse Complete - Runways: %d Pavements: %d Features: %d Taxiways: %d", &rwys, &pvmnts, &feats, &twys); + + pInfo->SetRunways( rwys ); + pInfo->SetPavements( pvmnts ); + pInfo->SetFeats( feats ); + pInfo->SetTaxiways( twys ); + + } else if ( strstr( line, "Finished airport " ) != NULL ) { + // Grab the stats + int parse_sec, parse_nsec; + int build_sec, build_nsec; + int clean_sec, clean_nsec; + int tess_sec, tess_nsec; + SGTimeStamp parse_ts; + SGTimeStamp build_ts; + SGTimeStamp clean_ts; + SGTimeStamp tess_ts; + + state = P_STATE_DONE; + + sscanf(line, "Finished airport %*s : parse %d.%d : build %d.%d : clean %d.%d : tesselate %d.%d", + &parse_sec, &parse_nsec, &build_sec, &build_nsec, &clean_sec, &clean_nsec, &tess_sec, &tess_nsec ); + + parse_ts.setTime( parse_sec, parse_nsec ); + build_ts.setTime( build_sec, build_nsec ); + clean_ts.setTime( clean_sec, clean_nsec ); + tess_ts.setTime( tess_sec, tess_nsec ); + + pInfo->SetParseTime( parse_ts ); + pInfo->SetBuildTime( build_ts ); + pInfo->SetCleanTime( clean_ts ); + pInfo->SetTessTime( tess_ts ); + + procHandle.wait(); + } else if ( strstr( line, "Build Feature Poly " ) != NULL ) { + state = P_STATE_BUILD; + } else if ( strstr( line, "Build Pavement " ) != NULL ) { + state = P_STATE_BUILD; + } else if ( strstr( line, "Build Runway " ) != NULL ) { + state = P_STATE_BUILD; + } else if ( strstr( line, "Tesselating " ) != NULL ) { + state = P_STATE_TRIANGULATE; + } else if ( strstr( line, "Adding runway nodes and normals " ) != NULL ) { + state = P_STATE_OUTPUT; + } + + } + + SetTimeout(); + return state; +} + + +ostream& operator<< (ostream &out, const AirportInfo &ai) +{ + char snap_string[32]; + sprintf( snap_string, "%1.8lf", ai.snap ); + + out << ai.icao; + out << ","; + out << ai.numRunways; + out << ","; + out << ai.numPavements; + out << ","; + out << ai.numFeats; + out << ","; + out << ai.numTaxiways; + out << ","; + out << ai.parseTime; + out << ","; + out << ai.buildTime; + out << ","; + out << ai.cleanTime; + out << ","; + out << ai.tessTime; + out << ","; + out << ai.parseTime+ai.buildTime+ai.cleanTime+ai.tessTime; + out << ","; + out << snap_string, + out << ","; + out << ai.errString; + + return out; // MSVC +} + + +/*** PROCESS LIST CLASS ***/ +ProcessList::ProcessList( int n, string& summaryfile, Scheduler* pScheduler ) : available(n), ready(1), state( PL_STATE_WAIT_FOR_LAUNCH ) +{ + // The process List is responsible for creating new processes (Launch) + // and monitoring the status of the launched parsers (Monitor) These + // functions are called from different threads. + pss = pScheduler->GetServerSocket(); + + // remember the output file + csvfile.open( summaryfile.c_str(), ios_base::out | ios_base::app ); + + // remember the scheduler so we can add retries + scheduler = pScheduler; + + // remember the number of available helper procs so we know when we're full + threads = n; +} + +// When a slot is available, the main thread calls launch to instantiate a +// new pareser process +void ProcessList::Launch( string command, string file, AirportInfo* pai, bool last ) +{ + Process::Args args; + char arg[64]; + Pipe outPipe; + + // generate correct command line arguments + args.push_back("--work=work"); + + sprintf( arg, "--input=%s", file.c_str() ); + args.push_back(arg); + + sprintf( arg, "--airport-pos=%ld", pai->GetPos() ); + args.push_back(arg); + + sprintf( arg, "--snap=%1.8lf", pai->GetSnap() ); + args.push_back(arg); + + sprintf( arg, "--redirect-port=%d", GENAPT_PORT ); + args.push_back(arg); + + // Launch the child process + ProcessHandle ph = Process::launch(command, args, 0, &outPipe, &outPipe); + + // Wait 10 seconds for connection + Timespan timeout( 10, 0 ); + int retVal = pss->poll( timeout, Net::Socket::SELECT_READ ); + + // If we connected - create a new entry + if ( retVal == true ) { + Net::SocketAddress sockaddr; + Net::StreamSocket sock = pss->acceptConnection( sockaddr ); + + // Make sure the list can't be modified while adding a member + lock.lock(); + ProcessInfo pi( pai, ph, sock ); + plist.push_back( pi ); + lock.unlock(); + + // If we have all of the airports in our list, we are done + // when the list is empty - set the transition state + if ( last ) { + // The launch list is empty - we're ready to monitor + state = PL_STATE_ALL_LAUNCHED; + ready.set(); + } else if ( plist.size() == threads ) { + // The resource list is full - we're ready to monitor + state = PL_STATE_LIST_FULL; + ready.set(); + } else { + // resource list has space, and launch list is not empty - hold off monitoring + state = PL_STATE_WAIT_FOR_LAUNCH; + } + } +} + +Timespan ProcessList::GetNextTimeout() +{ + SGTimeStamp now, min, timeout; + + min.setTime( UINT_MAX, 0 ); + timeout.setTime( 0, 0 ); + + for ( unsigned int i=0; i< plist.size(); i++ ) { + if ( plist[i].GetTimeout() < min ) { + min = plist[i].GetTimeout(); + } + } + + now.stamp(); + if ( min > now ) { + timeout = min - now; + } + + return Timespan( timeout.get_seconds(), timeout.get_usec() ); +} + +void ProcessList::HandleReceivedMessages( Net::Socket::SocketList& slr ) +{ + // for each socket that has data - find the corresponding icao + for (unsigned int i=0; i<slr.size(); i++) { + Net::StreamSocket ss = (Net::StreamSocket)slr[i]; + + // find the index handling this socket, and let it deal with the line + for ( unsigned int j=0; j < plist.size(); j++ ) { + if ( plist[j].GetSocket() == ss ) { + plist[j].HandleLine( ); + break; + } + } + } +} + +void ProcessList::HandleFinished( void ) +{ + AirportInfo* pInfo = NULL; + int num_deleted = 0; + bool done = false; + + while (!done) { + done = true; + + lock.lock(); + for ( unsigned int i=0; i< plist.size(); i++ ) { + switch ( plist[i].GetState() ) { + case P_STATE_DONE: + plist[i].SetErrorString( "success" ); + + // holding the list lock - only one thread can write to the csvfile at a time + csvfile << plist[i].GetInfo() << "\n"; + csvfile.flush(); + + // remove this airport from the list - it's complete + plist[i].CloseSock(); + plist.erase( plist.begin()+i ); + + // keep track of the number of deleted entries + num_deleted++; + + // let's iterate again to look for more timeouts... + done = false; + break; + + case P_STATE_KILLED: + // holding the list lock - only one thread can write to the csvfile at a time + csvfile << plist[i].GetInfo() << "\n"; + csvfile.flush(); + + // Schedule a retry + pInfo = plist[i].GetInfoPtr(); + pInfo->IncreaseSnap(); + scheduler->RetryAirport( pInfo ); + + // remove the airport from the monitor list - it's complete + plist[i].CloseSock(); + plist.erase( plist.begin()+i ); + + // keep track of the number of deleted entries + num_deleted++; + + // let's iterate again to look for more timeouts... + done = false; + break; + + default: + break; + } + } + + lock.unlock(); + } + + + // Let launcher thread know we have opening(s) + while ( num_deleted-- ) { + // make sure we don't start waiting on output before a new apt is launched... + if (state != PL_STATE_ALL_LAUNCHED) { + state = PL_STATE_WAIT_FOR_LAUNCH; + } + + // free each resource that is no longer used + available.set(); + } +} + +void ProcessList::WaitForSlot( void ) +{ + available.wait(); +} + +// list lock is held +void ProcessList::HandleTimeouts() +{ + SGTimeStamp now; + + now.stamp(); + for ( unsigned int i=0; i< plist.size(); i++ ) { + if ( plist[i].GetTimeout() < now ) { + plist[i].Kill(); + } + } +} + +void ProcessList::Monitor() +{ + // Wait until process list has a connection, then continue until we are done + while( state != PL_STATE_DONE ) { + Net::Socket::SocketList slr, slw, sle; + Timespan timeout; + int retVal; + + // if we aren't ready to start - wait on ready + if ( state == PL_STATE_WAIT_FOR_LAUNCH ) { + ready.wait(); + } + + // then lock the list when calculating the timeouts + lock.lock(); + + // calculate the shortest timeout + timeout = GetNextTimeout(); + + // Add currently connected sockets + for ( unsigned int i=0; i< plist.size(); i++ ) { + slr.push_back( plist[i].GetSocket() ); + } + + // unlock before waiting on i/o + lock.unlock(); + + // this needs to be interrupted when new airports are added to the list + retVal = Net::Socket::select( slr, slw, sle, timeout ); + + if ( retVal > 0 ) { + HandleReceivedMessages( slr ); + } else { + HandleTimeouts(); + } + + // remove finished or dead processes, and notify launcher + // + HandleFinished(); + + slr.clear(); + slw.clear(); + sle.clear(); + + // if we have launched all airports, we are done + if ( ( state == PL_STATE_ALL_LAUNCHED ) && ( plist.size() == 0 ) ) { + state = PL_STATE_DONE; + } + } + + csvfile.close(); +} + +/*** PROCESS MONITOR ***/ +ProcessMonitor::ProcessMonitor(ProcessList* pl) : Runnable() +{ + plist = pl; +} + +void ProcessMonitor::run() +{ + SG_LOG( SG_GENERAL, SG_INFO, "ProcessMonitor Started " ); + + // Run the monitoring function in this thread + plist->Monitor(); + + SG_LOG( SG_GENERAL, SG_INFO, "ProcessMonitor Exited " ); +} + +/*** SCEDULER ***/ +bool Scheduler::IsAirportDefinition( char* line, string icao ) +{ + char* tok; + int code; + Airport* airport = NULL; + bool match = false; + + // Get the number code + tok = strtok(line, " \t\r\n"); + + if (tok) + { + line += strlen(tok)+1; + code = atoi(tok); + + switch(code) + { + case LAND_AIRPORT_CODE: + case SEA_AIRPORT_CODE: + case HELIPORT_CODE: + airport = new Airport( code, line ); + if ( airport->GetIcao() == icao ) + { + match = true; + } + break; + + case LAND_RUNWAY_CODE: + case WATER_RUNWAY_CODE: + case HELIPAD_CODE: + case PAVEMENT_CODE: + case LINEAR_FEATURE_CODE: + case BOUNDRY_CODE: + case NODE_CODE: + case BEZIER_NODE_CODE: + case CLOSE_NODE_CODE: + case CLOSE_BEZIER_NODE_CODE: + case TERM_NODE_CODE: + case TERM_BEZIER_NODE_CODE: + case AIRPORT_VIEWPOINT_CODE: + case AIRPLANE_STARTUP_LOCATION_CODE: + case LIGHT_BEACON_CODE: + case WINDSOCK_CODE: + case TAXIWAY_SIGN: + case LIGHTING_OBJECT: + case COMM_FREQ1_CODE: + case COMM_FREQ2_CODE: + case COMM_FREQ3_CODE: + case COMM_FREQ4_CODE: + case COMM_FREQ5_CODE: + case COMM_FREQ6_CODE: + case COMM_FREQ7_CODE: + case END_OF_FILE : + break; + } + } + + return match; +} + +void Scheduler::AddAirport( string icao ) +{ + char line[2048]; + long cur_pos; + bool found = false; + AirportInfo* pInfo; + + ifstream in( filename.c_str() ); + if ( !in.is_open() ) + { + SG_LOG( SG_GENERAL, SG_ALERT, "Cannot open file: " << filename ); + exit(-1); + } + + SG_LOG( SG_GENERAL, SG_INFO, "Adding airport " << icao << " to parse list"); + while ( !in.eof() && !found ) + { + // remember the position of this line + cur_pos = in.tellg(); + + // get a line + in.getline(line, 2048); + + // this is and airport definition - remember it + if ( IsAirportDefinition( line, icao ) ) + { + SG_LOG( SG_GENERAL, SG_DEBUG, "Found airport " << icao << " at " << cur_pos ); + + pInfo = new AirportInfo( icao, cur_pos, gSnap ); + originalList.push_back( *pInfo ); + delete pInfo; + + found = true; + } + } +} + +long Scheduler::FindAirport( string icao ) +{ + char line[2048]; + long cur_pos = 0; + bool found = false; + + ifstream in( filename.c_str() ); + if ( !in.is_open() ) + { + SG_LOG( SG_GENERAL, SG_ALERT, "Cannot open file: " << filename ); + exit(-1); + } + + SG_LOG( SG_GENERAL, SG_DEBUG, "Finding airport " << icao ); + while ( !in.eof() && !found ) + { + // remember the position of this line + cur_pos = in.tellg(); + + // get a line + in.getline(line, 2048); + + // this is and airport definition - remember it + if ( IsAirportDefinition( line, icao ) ) + { + SG_LOG( SG_GENERAL, SG_DEBUG, "Found airport " << line << " at " << cur_pos ); + found = true; + } + } + + if (found) + { + return cur_pos; + } + else + { + return 0; + } +} + +void Scheduler::RetryAirport( AirportInfo* pai ) +{ + retryList.push_back( *pai ); +} + +void Scheduler::AddAirports( long start_pos, float min_lat, float min_lon, float max_lat, float max_lon ) +{ + char line[2048]; + char* def; + long cur_pos; + long cur_apt_pos = 0; + string cur_apt_name; + char* tok; + int code; + bool match; + bool done; + + done = false; + match = false; + + // start from current position, and push all airports where a runway start or end + // lies within the given min/max coordinates + + ifstream in( filename.c_str() ); + if ( !in.is_open() ) + { + SG_LOG( SG_GENERAL, SG_ALERT, "Cannot open file: " << filename ); + exit(-1); + } + + if (start_pos) + { + in.seekg(start_pos, ios::beg); + } + + while (!done) + { + // remember the position of this line + cur_pos = in.tellg(); + + // get a line + in.getline(line, 2048); + def = &line[0]; + + // Get the number code + tok = strtok(def, " \t\r\n"); + + if (tok) + { + def += strlen(tok)+1; + code = atoi(tok); + + switch(code) + { + case LAND_AIRPORT_CODE: + case SEA_AIRPORT_CODE: + case HELIPORT_CODE: + { + Airport* airport = new Airport( code, def ); + if (match) + { + // Start off with given snap value + AirportInfo* pInfo = new AirportInfo( cur_apt_name, cur_apt_pos, gSnap ); + originalList.push_back( *pInfo ); + delete pInfo; + } + // remember this new apt pos and name, and clear match + cur_apt_pos = cur_pos; + cur_apt_name = airport->GetIcao(); + delete airport; + + match = false; + } + break; + + case END_OF_FILE: + if (match) + { + // Start off with given snap value + AirportInfo* pInfo = new AirportInfo( cur_apt_name, cur_apt_pos, gSnap ); + originalList.push_back( *pInfo ); + delete pInfo; + } + done = true; + break; + + case LAND_RUNWAY_CODE: + // if the the runway start / end coords are within the rect, + // we have a winner + { + Runway* runway = new Runway(def); + Point3D start = runway->GetStart(); + Point3D end = runway->GetEnd(); + if ( (start.x() >= min_lon ) && + (start.y() >= min_lat ) && + (start.x() <= max_lon ) && + (start.y() <= max_lat ) ) { + match = true; + } + else if ( (end.x() >= min_lon ) && + (end.y() >= min_lat ) && + (end.x() <= max_lon ) && + (end.y() <= max_lat ) ) { + match = true; + } + delete runway; + } + break; + + case WATER_RUNWAY_CODE: + // if the the runway start / end coords are within the rect, + // we have a winner + { + WaterRunway* runway = new WaterRunway(def); + Point3D start = runway->GetStart(); + Point3D end = runway->GetEnd(); + if ( (start.x() >= min_lon ) && + (start.y() >= min_lat ) && + (start.x() <= max_lon ) && + (start.y() <= max_lat ) ) { + match = true; + } + else if ( (end.x() >= min_lon ) && + (end.y() >= min_lat ) && + (end.x() <= max_lon ) && + (end.y() <= max_lat ) ) { + match = true; + } + delete runway; + } + break; + + case HELIPAD_CODE: + // if the heliport coords are within the rect, we have + // a winner + { + Helipad* helipad = new Helipad(def); + Point3D loc = helipad->GetLoc(); + if ( (loc.x() >= min_lon ) && + (loc.y() >= min_lat ) && + (loc.x() <= max_lon ) && + (loc.y() <= max_lat ) ) { + match = true; + } + delete helipad; + } + break; + + case TAXIWAY_CODE: + case PAVEMENT_CODE: + case LINEAR_FEATURE_CODE: + case BOUNDRY_CODE: + case NODE_CODE: + case BEZIER_NODE_CODE: + case CLOSE_NODE_CODE: + case CLOSE_BEZIER_NODE_CODE: + case TERM_NODE_CODE: + case TERM_BEZIER_NODE_CODE: + case AIRPORT_VIEWPOINT_CODE: + case AIRPLANE_STARTUP_LOCATION_CODE: + case LIGHT_BEACON_CODE: + case WINDSOCK_CODE: + case TAXIWAY_SIGN: + case LIGHTING_OBJECT: + case COMM_FREQ1_CODE: + case COMM_FREQ2_CODE: + case COMM_FREQ3_CODE: + case COMM_FREQ4_CODE: + case COMM_FREQ5_CODE: + case COMM_FREQ6_CODE: + case COMM_FREQ7_CODE: + break; + } + } + } +} + +Scheduler::Scheduler(string& cmd, string& datafile, const string& root, const string_list& elev_src) +{ + command = cmd; + filename = datafile; + work_dir = root; + elevation = elev_src; + + ifstream in( filename.c_str() ); + if ( !in.is_open() ) + { + SG_LOG( SG_GENERAL, SG_ALERT, "Cannot open file: " << filename ); + exit(-1); + } +} + +void Scheduler::Schedule( int num_threads, string& summaryfile ) +{ + ProcessList *procList = NULL; + Thread *monThread = NULL; + ProcessMonitor *procMon = NULL; + bool done = false; + bool last = false; + ofstream csvfile; + + // open and truncate the summary file : monitor only appends + csvfile.open( summaryfile.c_str(), ios_base::out | ios_base::trunc ); + csvfile.close(); + + SG_LOG( SG_GENERAL, SG_INFO, "Scheduler: Bind to socket" ); + + // Bind the parent listener socket for children to connect to + ss.bind(GENAPT_PORT); + ss.listen(); + + SG_LOG( SG_GENERAL, SG_INFO, "Scheduler: Bound" ); + + while (!done) { + procList = new ProcessList(num_threads, summaryfile, this); + monThread = new Thread; + procMon = new ProcessMonitor(procList); + + // Launch monitor thread + monThread->start(*procMon); + + // now try to launch child processes to parse individual airports + for ( unsigned int i=0; i<originalList.size(); i++ ) { + // Wait for an available process slot + procList->WaitForSlot(); + + SG_LOG( SG_GENERAL, SG_INFO, "Scheduler: originalList has " << originalList.size() << ", i is " << i ); + + // let the process list know if more airports are coming + if ( i == originalList.size()-1 ) { + last = true; + } + + // Launch a new parser + procList->Launch( command, filename, &originalList[i], last ); + } + + // Sync up before relaunching + monThread->join(); + + // Delete the old monitor + delete procMon; + delete monThread; + delete procList; + + SG_LOG( SG_GENERAL, SG_INFO, "Scheduler: originalList has " << originalList.size() << ", retry list has " << retryList.size() << " entries" ); + + // delete original, and copy retry to it + if ( retryList.size() ) { + SG_LOG( SG_GENERAL, SG_INFO, "Scheduler: clear original list " ); + originalList.clear(); + + SG_LOG( SG_GENERAL, SG_INFO, "Scheduler - cleared original: originalList has " << originalList.size() << ", retry list has " << retryList.size() << " entries" ); + + for ( unsigned int i=0; i<retryList.size(); i++ ) { + originalList.push_back( retryList[i] ); + } + + SG_LOG( SG_GENERAL, SG_INFO, "Scheduler - copied retryList: originalList has " << originalList.size() << ", retry list has " << retryList.size() << " entries" ); + + retryList.clear(); + + SG_LOG( SG_GENERAL, SG_INFO, "Scheduler - cleared retry: originalList has " << originalList.size() << ", retry list has " << retryList.size() << " entries" ); + } else { + done = true; + } + } +} diff --git a/src/Airports/GenAirports850/scheduler.hxx b/src/Airports/GenAirports850/scheduler.hxx new file mode 100644 index 00000000..c727ca39 --- /dev/null +++ b/src/Airports/GenAirports850/scheduler.hxx @@ -0,0 +1,200 @@ +#include <string> +#include <iostream> +#include <fstream> + +#include <Poco/Mutex.h> +#include <Poco/Pipe.h> +#include <Poco/PipeStream.h> +#include <Poco/Process.h> +#include <Poco/Runnable.h> +#include <Poco/Semaphore.h> +#include <Poco/Thread.h> +#include <Poco/Timespan.h> +#include <Poco/Net/ServerSocket.h> +#include <Poco/Net/SocketAddress.h> +#include <Poco/Net/SocketStream.h> +#include <Poco/Net/Socket.h> +#include <Poco/Net/StreamSocket.h> + +#include <simgear/compiler.h> +#include <simgear/math/sg_types.hxx> +#include <simgear/timing/timestamp.hxx> + +#define P_STATE_INIT (0) +#define P_STATE_PARSE (1) +#define P_STATE_BUILD (2) +#define P_STATE_TRIANGULATE (3) +#define P_STATE_OUTPUT (4) +#define P_STATE_DONE (8) +#define P_STATE_KILLED (9) + +#define P_STATE_INIT_TIME ( 1*60) +#define P_STATE_PARSE_TIME ( 1*60) +#define P_STATE_BUILD_TIME (30*60) +#define P_STATE_TRIANGULATE_TIME ( 1*60) +#define P_STATE_OUTPUT_TIME (10*60) + +#define GENAPT_PORT (12397) +#define PL_STATE_INIT (0) +#define PL_STATE_WAIT_FOR_LAUNCH (1) +#define PL_STATE_LIST_FULL (2) +#define PL_STATE_ALL_LAUNCHED (3) +#define PL_STATE_DONE (10) + +using namespace std; +using namespace Poco; + +// Forward declaration +class Scheduler; + +class AirportInfo +{ +public: + AirportInfo( string id, long p, double s ) + { + icao = id; + pos = p; + snap = s; + + numRunways = -1; + numPavements = -1; + numFeats = -1; + numTaxiways = -1; + } + + string GetIcao( void ) { return icao; } + long GetPos( void ) { return pos; } + double GetSnap( void ) { return snap; } + + void SetRunways( int r ) { numRunways = r; } + void SetPavements( int p ) { numPavements = p; } + void SetFeats( int f ) { numFeats = f; } + void SetTaxiways( int t ) { numTaxiways = t; } + void SetParseTime( SGTimeStamp t ) { parseTime = t; } + void SetBuildTime( SGTimeStamp t ) { buildTime = t; } + void SetCleanTime( SGTimeStamp t ) { cleanTime = t; } + void SetTessTime( SGTimeStamp t ) { tessTime = t; } + void SetErrorString( char* e ) { errString = e; } + + void IncreaseSnap( void ) { snap *= 2.0f; } + + friend ostream& operator<<(ostream& output, const AirportInfo& ai); + +private: + string icao; + long pos; + + int numRunways; + int numPavements; + int numFeats; + int numTaxiways; + + SGTimeStamp parseTime; + SGTimeStamp buildTime; + SGTimeStamp cleanTime; + SGTimeStamp tessTime; + + double snap; + string errString; +}; +typedef std::vector <AirportInfo> parseList; + +class ProcessInfo +{ +public: + ProcessInfo( AirportInfo* pai, const ProcessHandle ph, Net::StreamSocket s ); + + void SetTimeout( void ); + SGTimeStamp GetTimeout( void ) { return timeout; } + string GetIcao( void ) { return pInfo->GetIcao(); } + Net::StreamSocket GetSocket( void ) { return sock; } + int GetState( void ) { return state; } + AirportInfo GetInfo( void ) { return *pInfo; } + AirportInfo* GetInfoPtr( void ) { return pInfo; } + + void SetErrorString( char *e ) { pInfo->SetErrorString( e ); } + + int HandleLine( void ); + void Kill( void ); + void CloseSock( void ) { sock.close(); } + +private: + AirportInfo* pInfo; + ProcessHandle procHandle; + Net::StreamSocket sock; + Net::SocketStreamBuf *pssb; + istream *pin; + int state; + SGTimeStamp timeout; +}; +typedef std::vector <ProcessInfo> ProcessInfoList; + +class ProcessList +{ +public: + ProcessList( int n, string& summaryfile, Scheduler* pScheduler ); + + // The main thread needs to wait until a slot is ready for creating a new + // Parser child process + inline void WaitForSlot(void); + + // When a slot is available, the main thread calls launch to instantiate a + // new pareser process + void Launch( string command, string file, AirportInfo* pai, bool last ); + Timespan GetNextTimeout(); + void HandleReceivedMessages( Net::Socket::SocketList& slr ); + void HandleTimeouts(); + void HandleFinished( void ); + void Monitor(); + +private: + Semaphore available; + Semaphore ready; + Mutex lock; + ProcessInfoList plist; + Net::ServerSocket* pss; + int state; + int threads; + ofstream csvfile; + Scheduler* scheduler; +}; + +class ProcessMonitor : public Runnable +{ +public: + ProcessMonitor(ProcessList* pl); + virtual void run(); + +private: + ProcessList* plist; +}; + +class Scheduler +{ +public: + Scheduler(string& cmd, string& datafile, const string& root, const string_list& elev_src); + + long FindAirport( string icao ); + void AddAirport( string icao ); + void AddAirports( long start_pos, float min_lat, float min_lon, float max_lat, float max_lon ); + void RetryAirport( AirportInfo* pInfo ); + + void Schedule( int num_threads, string& summaryfile ); + + Net::ServerSocket* GetServerSocket( void ) { return &ss; } + +private: + bool IsAirportDefinition( char* line, string icao ); + + string command; + string filename; + string_list elevation; + string work_dir; + + Net::ServerSocket ss; + + // List of positions in database file to parse + parseList originalList; + parseList retryList; +}; +