1
0
Fork 0

Investigating some wierd behavior where the threaded metar fetcher would

occasionally cause a large number of valid stations to be flagged as invalid.
This *seemed* like a "race condition" type problem because there were some
assumptions in the communication between the main process and the threaded
loader which if they broke down could lead to this problem.

In the process of removing this ambiguity, I restructured the threaded
(and non-threaded) metar fetching code a bit.  Some of the top level logic
(which Erik politely left untouched) didn't make nearly as much sense in the
context of a threaded metar loader and could have contributed to some of the
wierdness I was seeing.
This commit is contained in:
curt 2004-02-28 19:52:17 +00:00
parent feb9f16410
commit d8000569ea
2 changed files with 212 additions and 200 deletions

View file

@ -316,7 +316,7 @@ FGInterpolateEnvironmentCtrl::bucket::operator< (const bucket &b) const
FGMetarEnvironmentCtrl::FGMetarEnvironmentCtrl () FGMetarEnvironmentCtrl::FGMetarEnvironmentCtrl ()
: env( new FGInterpolateEnvironmentCtrl ), : env( new FGInterpolateEnvironmentCtrl ),
_icao( fgGetString("/sim/presets/airport-id") ), _icao( "" ),
search_interval_sec( 60.0 ), // 1 minute search_interval_sec( 60.0 ), // 1 minute
same_station_interval_sec( 900.0 ), // 15 minutes same_station_interval_sec( 900.0 ), // 15 minutes
search_elapsed( 9999.0 ), search_elapsed( 9999.0 ),
@ -396,19 +396,21 @@ FGMetarEnvironmentCtrl::init ()
->search( longitude->getDoubleValue(), ->search( longitude->getDoubleValue(),
latitude->getDoubleValue(), latitude->getDoubleValue(),
true ); true );
if ( fetch_data( a.id ) ) { FGMetarResult result = fetch_data( a.id );
cout << "closest station w/ metar = " << a.id << endl; if ( result.m != NULL ) {
SG_LOG( SG_GENERAL, SG_INFO, "closest station w/ metar = " << a.id);
last_apt = a; last_apt = a;
_icao = a.id; _icao = a.id;
search_elapsed = 0.0; search_elapsed = 0.0;
fetch_elapsed = 0.0; fetch_elapsed = 0.0;
update_metar_properties( result.m );
update_env_config(); update_env_config();
env->init(); env->init();
found_metar = true; found_metar = true;
} else { } else {
// mark as no metar so it doesn't show up in subsequent // mark as no metar so it doesn't show up in subsequent
// searches. // searches.
cout << "no metar at metar = " << a.id << endl; SG_LOG( SG_GENERAL, SG_INFO, "no metar at metar = " << a.id );
globals->get_airports()->no_metar( a.id ); globals->get_airports()->no_metar( a.id );
} }
} }
@ -427,12 +429,17 @@ FGMetarEnvironmentCtrl::reinit ()
void void
FGMetarEnvironmentCtrl::update(double delta_time_sec) FGMetarEnvironmentCtrl::update(double delta_time_sec)
{ {
FGMetarResult result;
static const SGPropertyNode *longitude static const SGPropertyNode *longitude
= fgGetNode( "/position/longitude-deg", true ); = fgGetNode( "/position/longitude-deg", true );
static const SGPropertyNode *latitude static const SGPropertyNode *latitude
= fgGetNode( "/position/latitude-deg", true ); = fgGetNode( "/position/latitude-deg", true );
search_elapsed += delta_time_sec; search_elapsed += delta_time_sec;
fetch_elapsed += delta_time_sec; fetch_elapsed += delta_time_sec;
// if time for a new search request, push it onto the request
// queue
if ( search_elapsed > search_interval_sec ) { if ( search_elapsed > search_interval_sec ) {
FGAirport a = globals->get_airports() FGAirport a = globals->get_airports()
->search( longitude->getDoubleValue(), ->search( longitude->getDoubleValue(),
@ -441,205 +448,201 @@ FGMetarEnvironmentCtrl::update(double delta_time_sec)
if ( last_apt.id != a.id if ( last_apt.id != a.id
|| fetch_elapsed > same_station_interval_sec ) || fetch_elapsed > same_station_interval_sec )
{ {
if ( fetch_data( a.id ) ) { SG_LOG( SG_GENERAL, SG_INFO, "closest station w/ metar = " << a.id);
cout << "closest station w/ metar = " << a.id << endl; request_queue.push( a.id );
last_apt = a; last_apt = a;
_icao = a.id; _icao = a.id;
search_elapsed = 0.0; search_elapsed = 0.0;
fetch_elapsed = 0.0; fetch_elapsed = 0.0;
update_env_config();
env->reinit();
} else {
// mark as no metar so it doesn't show up in subsequent
// searches.
cout << "no metar at metar = " << a.id << endl;
globals->get_airports()->no_metar( a.id );
}
} else { } else {
search_elapsed = 0.0; search_elapsed = 0.0;
// cout << "same station, waiting = " SG_LOG( SG_GENERAL, SG_INFO, "same station, waiting = "
// << same_station_interval_sec - fetch_elapsed << endl; << same_station_interval_sec - fetch_elapsed );
}
}
#ifndef ENABLE_THREADS
// No loader thread running so manually fetch the data
string id = "";
while ( !request_queue.empty() ) {
id = request_queue.front();
request_queue.pop();
}
if ( !id.empty() ) {
SG_LOG( SG_GENERAL, SG_INFO, "inline fetching = " << id );
result = fetch_data( id );
result_queue.push( result );
}
#endif // ENABLE_THREADS
// process any results from the loader.
while ( !result_queue.empty() ) {
result = result_queue.front();
result_queue.pop();
if ( result.m != NULL ) {
update_metar_properties( result.m );
delete result.m;
update_env_config();
env->reinit();
} else {
// mark as no metar so it doesn't show up in subsequent
// searches, and signal an immediate re-search.
SG_LOG( SG_GENERAL, SG_WARN,
"no metar at station = " << result.icao );
globals->get_airports()->no_metar( result.icao );
search_elapsed = 9999.0;
} }
} }
env->update(delta_time_sec); env->update(delta_time_sec);
} }
void void
FGMetarEnvironmentCtrl::setEnvironment (FGEnvironment * environment) FGMetarEnvironmentCtrl::setEnvironment (FGEnvironment * environment)
{ {
env->setEnvironment(environment); env->setEnvironment(environment);
} }
bool FGMetarResult
FGMetarEnvironmentCtrl::fetch_data (const string &icao) FGMetarEnvironmentCtrl::fetch_data( const string &icao )
{ {
char s[128]; FGMetarResult result;
double d, dt; result.icao = icao;
int i;
if ((icao == "") && (_icao == "")) {
_icao = fgGetString("/sim/presets/airport-id");
} else if (icao != "") {
_icao = icao;
}
// fetch station elevation if exists // fetch station elevation if exists
FGAirport a = globals->get_airports()->search( _icao ); FGAirport a = globals->get_airports()->search( icao );
station_elevation_ft = a.elevation; station_elevation_ft = a.elevation;
// fetch current metar data // fetch current metar data
SGMetar *m = NULL;
#ifdef ENABLE_THREADS
// We are only interested in the latest metar report
// FIXME: Do we want to keep the latest valid report instead?
while (!metar_queue.empty()) {
if (m != NULL)
delete m;
m = metar_queue.pop();
}
if ( m != NULL ) {
#else
try { try {
string host = proxy_host->getStringValue(); string host = proxy_host->getStringValue();
string auth = proxy_auth->getStringValue(); string auth = proxy_auth->getStringValue();
string port = proxy_port->getStringValue(); string port = proxy_port->getStringValue();
m = new SGMetar( _icao, host, port, auth); result.m = new SGMetar( icao, host, port, auth);
} catch (const sg_io_exception& e) {
#endif // ENABLE_THREADS
d = m->getMinVisibility().getVisibility_m();
d = (d != SGMetarNaN) ? d : 10000;
fgSetDouble("/environment/metar/min-visibility-m", d);
dt = m->getMaxVisibility().getVisibility_m();
d = (dt != SGMetarNaN) ? dt : d;
fgSetDouble("/environment/metar/max-visibility-m", d);
SGMetarVisibility *dirvis = m->getDirVisibility();
for (i = 0; i < 8; i++, dirvis++) {
const char *min = "/environment/metar/visibility[%d]/min-m";
const char *max = "/environment/metar/visibility[%d]/max-m";
char s[128];
d = dirvis->getVisibility_m();
d = (d != SGMetarNaN) ? d : 10000;
snprintf(s, 128, min, i);
fgSetDouble(s, d);
snprintf(s, 128, max, i);
fgSetDouble(s, d);
}
i = m->getWindDir();
if ( i == -1 ) {
fgSetInt("/environment/metar/base-wind-range-from",
m->getWindRangeFrom() );
fgSetInt("/environment/metar/base-wind-range-to",
m->getWindRangeTo() );
} else {
fgSetInt("/environment/metar/base-wind-range-from", i);
fgSetInt("/environment/metar/base-wind-range-to", i);
}
fgSetDouble("/environment/metar/base-wind-speed-kt",
m->getWindSpeed_kt() );
d = m->getGustSpeed_kt();
d = (d != SGMetarNaN) ? d : 0.0;
fgSetDouble("/environment/metar/gust-wind-speed-kt", d);
d = m->getTemperature_C();
if (d != SGMetarNaN) {
dt = m->getDewpoint_C();
dt = (dt != SGMetarNaN) ? dt : 0.0;
fgSetDouble("/environment/metar/dewpoint-degc", dt);
fgSetDouble("/environment/metar/rel-humidity-norm",
m->getRelHumidity() );
}
d = (d != SGMetarNaN) ? d : 15.0;
fgSetDouble("/environment/metar/temperature-degc", d);
d = m->getPressure_inHg();
d = (d != SGMetarNaN) ? d : 30.0;
fgSetDouble("/environment/metar/pressure-inhg", d);
vector<SGMetarCloud> cv = m->getClouds();
vector<SGMetarCloud>::iterator cloud;
const char *cl = "/environment/clouds/layer[%i]";
for (i = 0, cloud = cv.begin(); cloud != cv.end(); cloud++, i++) {
const char *coverage_string[5] =
{ "clear", "few", "scattered", "broken", "overcast" };
const double thickness[5] = { 0, 65, 600,750, 1000};
int q;
snprintf(s, 128, cl, i);
strncat(s, "/coverage", 128);
q = cloud->getCoverage();
q = (q != -1 ) ? q : 0;
fgSetString(s, coverage_string[q] );
snprintf(s, 128, cl, i);
strncat(s, "/elevation-ft", 128);
d = cloud->getAltitude_ft();
d = (d != SGMetarNaN) ? d : -9999;
fgSetDouble(s, d + station_elevation_ft);
snprintf(s, 128, cl, i);
strncat(s, "/thickness-ft", 128);
fgSetDouble(s, thickness[q]);
snprintf(s, 128, cl, i);
strncat(s, "/span-m", 128);
fgSetDouble(s, 40000.0);
}
for (; i < FGEnvironmentMgr::MAX_CLOUD_LAYERS; i++) {
snprintf(s, 128, cl, i);
strncat(s, "/coverage", 128);
fgSetString(s, "clear");
snprintf(s, 128, cl, i);
strncat(s, "/elevation-ft", 128);
fgSetDouble(s, -9999);
snprintf(s, 128, cl, i);
strncat(s, "/thickness-ft", 128);
fgSetDouble(s, 0);
snprintf(s, 128, cl, i);
strncat(s, "/span-m", 128);
fgSetDouble(s, 40000.0);
}
delete m;
}
#ifdef ENABLE_THREADS
mutex.lock();
metar_cond.signal();
mutex.unlock();
if (m == NULL)
return false;
#else
catch (const sg_io_exception& e) {
SG_LOG( SG_GENERAL, SG_WARN, "Error fetching live weather data: " SG_LOG( SG_GENERAL, SG_WARN, "Error fetching live weather data: "
<< e.getFormattedMessage().c_str() ); << e.getFormattedMessage().c_str() );
return false; result.m = NULL;
} }
#endif // ENABLE_THREADS
return true; return result;
} }
void
FGMetarEnvironmentCtrl::update_metar_properties( SGMetar *m )
{
int i;
double d, dt;
char s[128];
d = m->getMinVisibility().getVisibility_m();
d = (d != SGMetarNaN) ? d : 10000;
fgSetDouble("/environment/metar/min-visibility-m", d);
dt = m->getMaxVisibility().getVisibility_m();
d = (dt != SGMetarNaN) ? dt : d;
fgSetDouble("/environment/metar/max-visibility-m", d);
SGMetarVisibility *dirvis = m->getDirVisibility();
for (i = 0; i < 8; i++, dirvis++) {
const char *min = "/environment/metar/visibility[%d]/min-m";
const char *max = "/environment/metar/visibility[%d]/max-m";
char s[128];
d = dirvis->getVisibility_m();
d = (d != SGMetarNaN) ? d : 10000;
snprintf(s, 128, min, i);
fgSetDouble(s, d);
snprintf(s, 128, max, i);
fgSetDouble(s, d);
}
i = m->getWindDir();
if ( i == -1 ) {
fgSetInt("/environment/metar/base-wind-range-from",
m->getWindRangeFrom() );
fgSetInt("/environment/metar/base-wind-range-to",
m->getWindRangeTo() );
} else {
fgSetInt("/environment/metar/base-wind-range-from", i);
fgSetInt("/environment/metar/base-wind-range-to", i);
}
fgSetDouble("/environment/metar/base-wind-speed-kt",
m->getWindSpeed_kt() );
d = m->getGustSpeed_kt();
d = (d != SGMetarNaN) ? d : 0.0;
fgSetDouble("/environment/metar/gust-wind-speed-kt", d);
d = m->getTemperature_C();
if (d != SGMetarNaN) {
dt = m->getDewpoint_C();
dt = (dt != SGMetarNaN) ? dt : 0.0;
fgSetDouble("/environment/metar/dewpoint-degc", dt);
fgSetDouble("/environment/metar/rel-humidity-norm",
m->getRelHumidity() );
}
d = (d != SGMetarNaN) ? d : 15.0;
fgSetDouble("/environment/metar/temperature-degc", d);
d = m->getPressure_inHg();
d = (d != SGMetarNaN) ? d : 30.0;
fgSetDouble("/environment/metar/pressure-inhg", d);
vector<SGMetarCloud> cv = m->getClouds();
vector<SGMetarCloud>::iterator cloud;
const char *cl = "/environment/clouds/layer[%i]";
for (i = 0, cloud = cv.begin(); cloud != cv.end(); cloud++, i++) {
const char *coverage_string[5] =
{ "clear", "few", "scattered", "broken", "overcast" };
const double thickness[5] = { 0, 65, 600,750, 1000};
int q;
snprintf(s, 128, cl, i);
strncat(s, "/coverage", 128);
q = cloud->getCoverage();
q = (q != -1 ) ? q : 0;
fgSetString(s, coverage_string[q] );
snprintf(s, 128, cl, i);
strncat(s, "/elevation-ft", 128);
d = cloud->getAltitude_ft();
d = (d != SGMetarNaN) ? d : -9999;
fgSetDouble(s, d + station_elevation_ft);
snprintf(s, 128, cl, i);
strncat(s, "/thickness-ft", 128);
fgSetDouble(s, thickness[q]);
snprintf(s, 128, cl, i);
strncat(s, "/span-m", 128);
fgSetDouble(s, 40000.0);
}
for (; i < FGEnvironmentMgr::MAX_CLOUD_LAYERS; i++) {
snprintf(s, 128, cl, i);
strncat(s, "/coverage", 128);
fgSetString(s, "clear");
snprintf(s, 128, cl, i);
strncat(s, "/elevation-ft", 128);
fgSetDouble(s, -9999);
snprintf(s, 128, cl, i);
strncat(s, "/thickness-ft", 128);
fgSetDouble(s, 0);
snprintf(s, 128, cl, i);
strncat(s, "/span-m", 128);
fgSetDouble(s, 40000.0);
}
}
#ifdef ENABLE_THREADS #ifdef ENABLE_THREADS
/** /**
* *
@ -647,34 +650,18 @@ FGMetarEnvironmentCtrl::fetch_data (const string &icao)
void void
FGMetarEnvironmentCtrl::MetarThread::run() FGMetarEnvironmentCtrl::MetarThread::run()
{ {
SGMetar *m = NULL;
// pthread_cleanup_push( metar_cleanup_handler, fetcher ); // pthread_cleanup_push( metar_cleanup_handler, fetcher );
while ( true ) while ( true )
{ {
set_cancel( SGThread::CANCEL_DISABLE ); set_cancel( SGThread::CANCEL_DISABLE );
try
{
cout << "Fetching ..." << endl;
// if (m != NULL) m = NULL;
string host = fetcher->proxy_host->getStringValue();
string auth = fetcher->proxy_auth->getStringValue();
string port = fetcher->proxy_port->getStringValue();
m = new SGMetar( fetcher->_icao, host, port, auth );
} catch (const sg_io_exception& e) { string icao = fetcher->request_queue.pop();
// SG_LOG( SG_GENERAL, SG_WARN, "Error fetching live weather data: " SG_LOG( SG_GENERAL, SG_INFO, "Thread: fetch metar data = " << icao );
// << e.getFormattedMessage().c_str() ); FGMetarResult result = fetcher->fetch_data( icao );
m = NULL;
}
set_cancel( SGThread::CANCEL_DEFERRED ); set_cancel( SGThread::CANCEL_DEFERRED );
fetcher->metar_queue.push( m ); fetcher->result_queue.push( result );
// Wait for the next frame signal before we fetch the next metar data
fetcher->mutex.lock();
fetcher->metar_cond.wait( fetcher->mutex );
fetcher->mutex.unlock();
} }
// pthread_cleanup_pop(1); // pthread_cleanup_pop(1);
} }

View file

@ -38,8 +38,10 @@
# include <math.h> # include <math.h>
#endif #endif
#include <queue>
#include <vector> #include <vector>
SG_USING_STD(queue);
SG_USING_STD(vector); SG_USING_STD(vector);
class SGPropertyNode; class SGPropertyNode;
@ -139,6 +141,13 @@ private:
}; };
// A convenience wrapper around SGMetar
struct FGMetarResult {
string icao;
SGMetar *m;
};
/** /**
* Interplation controller using the SGMetar class * Interplation controller using the SGMetar class
@ -152,7 +161,6 @@ public:
virtual void init (); virtual void init ();
virtual void reinit (); virtual void reinit ();
virtual void update (double delta_time_sec); virtual void update (double delta_time_sec);
virtual void setEnvironment (FGEnvironment * environment); virtual void setEnvironment (FGEnvironment * environment);
private: private:
@ -169,18 +177,35 @@ private:
SGPropertyNode *proxy_port; SGPropertyNode *proxy_port;
SGPropertyNode *proxy_auth; SGPropertyNode *proxy_auth;
bool fetch_data (const string &icao); FGMetarResult fetch_data( const string &icao );
virtual void update_metar_properties( SGMetar *m );
void update_env_config(); void update_env_config();
private: private:
#ifdef ENABLE_THREADS #ifdef ENABLE_THREADS
/** /**
* FIFO queue which holds a pointer to the fetched metar data. * FIFO queue which holds a pointer to the fetched metar data.
*/ */
SGBlockingQueue< SGMetar * > metar_queue; SGBlockingQueue < string > request_queue;
/**
* FIFO queue which holds a pointer to the fetched metar data.
*/
SGBlockingQueue < FGMetarResult > result_queue;
#else
/**
* FIFO queue which holds a pointer to the fetched metar data.
*/
queue < string > request_queue;
/**
* FIFO queue which holds a pointer to the fetched metar data.
*/
queue < FGMetarResult > result_queue;
#endif
#ifdef ENABLE_THREADS
/** /**
* This class represents the thread of execution responsible for * This class represents the thread of execution responsible for
* fetching the metar data. * fetching the metar data.
@ -192,7 +217,7 @@ private:
~MetarThread() {} ~MetarThread() {}
/** /**
* Reads the tile from disk. * Fetched the metar data from the NOAA.
*/ */
void run(); void run();