1
0
Fork 0

multi-threaded ogrdecode

- use --all-threads to use all available CPUs
- use --threads x to use x threads
This commit is contained in:
Peter Sadrozinski 2012-12-14 20:44:26 -05:00
parent cd3a1d647e
commit 4b97381559

View file

@ -26,9 +26,13 @@
#include <string> #include <string>
#include <map> #include <map>
#include <boost/thread.hpp>
#include <ogrsf_frmts.h> #include <ogrsf_frmts.h>
#include <simgear/compiler.h> #include <simgear/compiler.h>
#include <simgear/threads/SGThread.hxx>
#include <simgear/threads/SGQueue.hxx>
#include <simgear/debug/logstream.hxx> #include <simgear/debug/logstream.hxx>
#include <simgear/math/sg_geodesy.hxx> #include <simgear/math/sg_geodesy.hxx>
#include <simgear/misc/sg_path.hxx> #include <simgear/misc/sg_path.hxx>
@ -40,11 +44,11 @@
#include <Polygon/polygon.hxx> #include <Polygon/polygon.hxx>
/* stretch endpoints to reduce slivers in linear data ~.1 meters */ /* stretch endpoints to reduce slivers in linear data ~.1 meters */
// #define EP_STRETCH (0.000001)
#define EP_STRETCH (0.1) #define EP_STRETCH (0.1)
using std::string; using std::string;
// scope?
int line_width=50; int line_width=50;
string line_width_col; string line_width_col;
int point_width=500; int point_width=500;
@ -60,8 +64,40 @@ bool use_attribute_query=false;
string attribute_query; string attribute_query;
bool use_spatial_query=false; bool use_spatial_query=false;
double spat_min_x, spat_min_y, spat_max_x, spat_max_y; double spat_min_x, spat_min_y, spat_max_x, spat_max_y;
int num_threads = 1;
void processPoint(OGRPoint* poGeometry, const string& area_type, int width, tgChopper& results ) SGLockedQueue<OGRFeature *> global_workQueue;
class Decoder : public SGThread
{
public:
Decoder( OGRCoordinateTransformation *poct, int atf, int pwf, int lwf, tgChopper& c ) : chopper(c) {
poCT = poct;
area_type_field = atf;
point_width_field = pwf;
line_width_field = lwf;
}
private:
virtual void run();
void processPoint(OGRPoint* poGeometry, const string& area_type, int width );
void processLineString(OGRLineString* poGeometry, const string& area_type, int width, int with_texture );
void processPolygon(OGRPolygon* poGeometry, const string& area_type );
private:
// The transformation for each geometry object
OGRCoordinateTransformation *poCT;
// Store the reults per tile
tgChopper& chopper;
int area_type_field;
int point_width_field;
int line_width_field;
};
void Decoder::processPoint(OGRPoint* poGeometry, const string& area_type, int width )
{ {
SGGeod point = SGGeod::fromDeg( poGeometry->getX(),poGeometry->getY() ); SGGeod point = SGGeod::fromDeg( poGeometry->getX(),poGeometry->getY() );
tgPolygon shape = tgPolygon::Expand( point, width ); tgPolygon shape = tgPolygon::Expand( point, width );
@ -72,10 +108,10 @@ void processPoint(OGRPoint* poGeometry, const string& area_type, int width, tgCh
shape.SetPreserve3D( false ); shape.SetPreserve3D( false );
shape.SetTexMethod( TG_TEX_BY_GEODE ); shape.SetTexMethod( TG_TEX_BY_GEODE );
results.Add( shape, area_type ); chopper.Add( shape, area_type );
} }
void processLineString(OGRLineString* poGeometry, const string& area_type, int width, int with_texture, tgChopper& results ) void Decoder::processLineString(OGRLineString* poGeometry, const string& area_type, int width, int with_texture )
{ {
tgpolygon_list segments; tgpolygon_list segments;
tgContour line; tgContour line;
@ -135,16 +171,16 @@ void processLineString(OGRLineString* poGeometry, const string& area_type, int w
for ( unsigned int i=0; i<segments.size(); i++ ) { for ( unsigned int i=0; i<segments.size(); i++ ) {
segments[i].SetPreserve3D( false ); segments[i].SetPreserve3D( false );
if (with_texture) { if (with_texture) {
segments[i].SetTexMethod( TG_TEX_BY_GEODE ); segments[i].SetTexMethod( TG_TEX_BY_TPS_CLIPU );
} else { } else {
segments[i].SetTexMethod( TG_TEX_BY_GEODE ); segments[i].SetTexMethod( TG_TEX_BY_GEODE );
} }
results.Add( segments[i], area_type ); chopper.Add( segments[i], area_type );
} }
} }
void processPolygon(OGRPolygon* poGeometry, const string& area_type, tgChopper& results ) void Decoder::processPolygon(OGRPolygon* poGeometry, const string& area_type )
{ {
// bool preserve3D = ((poGeometry->getGeometryType()&wkb25DBit)==wkb25DBit); // bool preserve3D = ((poGeometry->getGeometryType()&wkb25DBit)==wkb25DBit);
@ -157,9 +193,147 @@ void processPolygon(OGRPolygon* poGeometry, const string& area_type, tgChopper&
// shape.SetPreserve3D( preserve3D ); // shape.SetPreserve3D( preserve3D );
shape.SetPreserve3D( false ); shape.SetPreserve3D( false );
results.Add( shape, area_type ); chopper.Add( shape, area_type );
} }
void Decoder::run()
{
// as long as we have feometry to parse, do so
while (!global_workQueue.empty()) {
OGRFeature *poFeature = global_workQueue.pop();
if ( poFeature ) {
OGRGeometry *poGeometry = poFeature->GetGeometryRef();
if (poGeometry==NULL) {
SG_LOG( SG_GENERAL, SG_INFO, "Found feature without geometry!" );
if (!continue_on_errors) {
SG_LOG( SG_GENERAL, SG_ALERT, "Aborting!" );
exit( 1 );
} else {
continue;
}
}
OGRwkbGeometryType geoType=wkbFlatten(poGeometry->getGeometryType());
if (geoType!=wkbPoint && geoType!=wkbMultiPoint &&
geoType!=wkbLineString && geoType!=wkbMultiLineString &&
geoType!=wkbPolygon && geoType!=wkbMultiPolygon) {
SG_LOG( SG_GENERAL, SG_INFO, "Unknown feature" );
continue;
}
string area_type_name=area_type;
if (area_type_field!=-1) {
area_type_name=poFeature->GetFieldAsString(area_type_field);
}
if ( is_ocean_area(area_type_name) ) {
// interior of polygon is ocean, holes are islands
SG_LOG( SG_GENERAL, SG_ALERT, "Ocean area ... SKIPPING!" );
// Ocean data now comes from GSHHS so we want to ignore
// all other ocean data
continue;
} else if ( is_void_area(area_type_name) ) {
// interior is ????
// skip for now
SG_LOG( SG_GENERAL, SG_ALERT, "Void area ... SKIPPING!" );
continue;
} else if ( is_null_area(area_type_name) ) {
// interior is ????
// skip for now
SG_LOG( SG_GENERAL, SG_ALERT, "Null area ... SKIPPING!" );
continue;
}
poGeometry->transform( poCT );
switch (geoType) {
case wkbPoint: {
SG_LOG( SG_GENERAL, SG_DEBUG, "Point feature" );
int width=point_width;
if (point_width_field!=-1) {
width=poFeature->GetFieldAsInteger(point_width_field);
if (width == 0) {
width=point_width;
}
}
processPoint((OGRPoint*)poGeometry, area_type_name, width);
break;
}
case wkbMultiPoint: {
SG_LOG( SG_GENERAL, SG_DEBUG, "MultiPoint feature" );
int width=point_width;
if (point_width_field!=-1) {
width=poFeature->GetFieldAsInteger(point_width_field);
if (width == 0) {
width=point_width;
}
}
OGRMultiPoint* multipt=(OGRMultiPoint*)poGeometry;
for (int i=0;i<multipt->getNumGeometries();i++) {
processPoint((OGRPoint*)(multipt->getGeometryRef(i)), area_type_name, width);
}
break;
}
case wkbLineString: {
SG_LOG( SG_GENERAL, SG_DEBUG, "LineString feature" );
int width=line_width;
if (line_width_field!=-1) {
width=poFeature->GetFieldAsInteger(line_width_field);
if (width == 0) {
width=line_width;
}
}
processLineString((OGRLineString*)poGeometry, area_type_name, width, texture_lines);
break;
}
case wkbMultiLineString: {
SG_LOG( SG_GENERAL, SG_DEBUG, "MultiLineString feature" );
int width=line_width;
if (line_width_field!=-1) {
width=poFeature->GetFieldAsInteger(line_width_field);
if (width == 0) {
width=line_width;
}
}
OGRMultiLineString* multils=(OGRMultiLineString*)poGeometry;
for (int i=0;i<multils->getNumGeometries();i++) {
processLineString((OGRLineString*)poGeometry, area_type_name, width, texture_lines);
}
break;
}
case wkbPolygon: {
SG_LOG( SG_GENERAL, SG_DEBUG, "Polygon feature" );
processPolygon((OGRPolygon*)poGeometry, area_type_name);
break;
}
case wkbMultiPolygon: {
SG_LOG( SG_GENERAL, SG_DEBUG, "MultiPolygon feature" );
OGRMultiPolygon* multipoly=(OGRMultiPolygon*)poGeometry;
for (int i=0;i<multipoly->getNumGeometries();i++) {
processPolygon((OGRPolygon*)(multipoly->getGeometryRef(i)), area_type_name);
}
break;
}
default:
/* Ignore unhandled objects */
break;
}
OGRFeature::DestroyFeature( poFeature );
}
}
}
// Main Thread
void processLayer(OGRLayer* poLayer, tgChopper& results ) void processLayer(OGRLayer* poLayer, tgChopper& results )
{ {
int feature_count=poLayer->GetFeatureCount(); int feature_count=poLayer->GetFeatureCount();
@ -218,9 +392,7 @@ void processLayer(OGRLayer* poLayer, tgChopper& results )
oTargetSRS.SetWellKnownGeogCS( "WGS84" ); oTargetSRS.SetWellKnownGeogCS( "WGS84" );
OGRCoordinateTransformation *poCT; OGRCoordinateTransformation *poCT = OGRCreateCoordinateTransformation(oSourceSRS, &oTargetSRS);
poCT = OGRCreateCoordinateTransformation(oSourceSRS, &oTargetSRS);
/* setup attribute and spatial queries */ /* setup attribute and spatial queries */
if (use_spatial_query) { if (use_spatial_query) {
@ -249,146 +421,35 @@ void processLayer(OGRLayer* poLayer, tgChopper& results )
} }
} }
// PSADRO TODO : Generate the work queue for this layer // Generate the work queue for this layer
// Then process the workqueue with threads
OGRFeature *poFeature; OGRFeature *poFeature;
poLayer->SetNextByIndex(start_record); poLayer->SetNextByIndex(start_record);
int numFeatures = poLayer->GetFeatureCount(); while ( ( poFeature = poLayer->GetNextFeature()) != NULL )
int cur_feature = 1;
for ( ; (poFeature = poLayer->GetNextFeature()) != NULL; OGRFeature::DestroyFeature( poFeature ) )
{ {
OGRGeometry *poGeometry; global_workQueue.push( poFeature );
poGeometry = poFeature->GetGeometryRef();
if (poGeometry==NULL) {
SG_LOG( SG_GENERAL, SG_INFO, "Found feature without geometry!" );
if (!continue_on_errors) {
SG_LOG( SG_GENERAL, SG_ALERT, "Aborting!" );
exit( 1 );
} else {
continue;
}
}
assert(poGeometry!=NULL);
OGRwkbGeometryType geoType=wkbFlatten(poGeometry->getGeometryType());
if (geoType!=wkbPoint && geoType!=wkbMultiPoint &&
geoType!=wkbLineString && geoType!=wkbMultiLineString &&
geoType!=wkbPolygon && geoType!=wkbMultiPolygon) {
SG_LOG( SG_GENERAL, SG_INFO, "Unknown feature" );
continue;
}
string area_type_name=area_type;
if (area_type_field!=-1) {
area_type_name=poFeature->GetFieldAsString(area_type_field);
}
if ( is_ocean_area(area_type_name) ) {
// interior of polygon is ocean, holes are islands
SG_LOG( SG_GENERAL, SG_ALERT, "Ocean area ... SKIPPING!" );
// Ocean data now comes from GSHHS so we want to ignore
// all other ocean data
continue;
} else if ( is_void_area(area_type_name) ) {
// interior is ????
// skip for now
SG_LOG( SG_GENERAL, SG_ALERT, "Void area ... SKIPPING!" );
continue;
} else if ( is_null_area(area_type_name) ) {
// interior is ????
// skip for now
SG_LOG( SG_GENERAL, SG_ALERT, "Null area ... SKIPPING!" );
continue;
}
poGeometry->transform( poCT );
switch (geoType) {
case wkbPoint: {
SG_LOG( SG_GENERAL, SG_DEBUG, "Point feature" );
int width=point_width;
if (point_width_field!=-1) {
width=poFeature->GetFieldAsInteger(point_width_field);
if (width == 0) {
width=point_width;
}
}
processPoint((OGRPoint*)poGeometry, area_type_name, width, results);
break;
}
case wkbMultiPoint: {
SG_LOG( SG_GENERAL, SG_DEBUG, "MultiPoint feature" );
int width=point_width;
if (point_width_field!=-1) {
width=poFeature->GetFieldAsInteger(point_width_field);
if (width == 0) {
width=point_width;
}
}
OGRMultiPoint* multipt=(OGRMultiPoint*)poGeometry;
for (int i=0;i<multipt->getNumGeometries();i++) {
processPoint((OGRPoint*)(multipt->getGeometryRef(i)), area_type_name, width, results);
}
break;
}
case wkbLineString: {
SG_LOG( SG_GENERAL, SG_DEBUG, "LineString feature" );
int width=line_width;
if (line_width_field!=-1) {
width=poFeature->GetFieldAsInteger(line_width_field);
if (width == 0) {
width=line_width;
}
}
processLineString((OGRLineString*)poGeometry, area_type_name, width, texture_lines, results);
break;
}
case wkbMultiLineString: {
SG_LOG( SG_GENERAL, SG_DEBUG, "MultiLineString feature" );
int width=line_width;
if (line_width_field!=-1) {
width=poFeature->GetFieldAsInteger(line_width_field);
if (width == 0) {
width=line_width;
}
}
OGRMultiLineString* multils=(OGRMultiLineString*)poGeometry;
for (int i=0;i<multils->getNumGeometries();i++) {
processLineString((OGRLineString*)poGeometry, area_type_name, width, texture_lines, results);
}
break;
}
case wkbPolygon: {
SG_LOG( SG_GENERAL, SG_DEBUG, "Polygon feature" );
processPolygon((OGRPolygon*)poGeometry, area_type_name, results);
break;
}
case wkbMultiPolygon: {
SG_LOG( SG_GENERAL, SG_DEBUG, "MultiPolygon feature" );
OGRMultiPolygon* multipoly=(OGRMultiPolygon*)poGeometry;
for (int i=0;i<multipoly->getNumGeometries();i++) {
processPolygon((OGRPolygon*)(multipoly->getGeometryRef(i)), area_type_name, results);
}
break;
}
default:
/* Ignore unhandled objects */
break;
}
} }
// Now process the workqueue with threads
std::vector<Decoder *> decoders;
for (int i=0; i<num_threads; i++) {
Decoder* decoder = new Decoder( poCT, area_type_field, point_width_field, line_width_field, results );
decoder->start();
decoders.push_back( decoder );
}
#if 0
while (!global_workQueue.empty()) {
sleep(1);
}
#endif
// Then wait until they are finished
for (unsigned int i=0; i<decoders.size(); i++) {
decoders[i]->join();
}
results.Save();
OCTDestroyCoordinateTransformation ( poCT ); OCTDestroyCoordinateTransformation ( poCT );
} }
@ -423,6 +484,10 @@ void usage(char* progname) {
SG_LOG( SG_GENERAL, SG_ALERT, " spatial query extents" ); SG_LOG( SG_GENERAL, SG_ALERT, " spatial query extents" );
SG_LOG( SG_GENERAL, SG_ALERT, "--texture-lines" ); SG_LOG( SG_GENERAL, SG_ALERT, "--texture-lines" );
SG_LOG( SG_GENERAL, SG_ALERT, " Enable textured lines" ); SG_LOG( SG_GENERAL, SG_ALERT, " Enable textured lines" );
SG_LOG( SG_GENERAL, SG_ALERT, "--threads" );
SG_LOG( SG_GENERAL, SG_ALERT, " Enable multithreading with user specified number of threads" );
SG_LOG( SG_GENERAL, SG_ALERT, "--all-threads" );
SG_LOG( SG_GENERAL, SG_ALERT, " Enable multithreading with all available cpu cores" );
SG_LOG( SG_GENERAL, SG_ALERT, "" ); SG_LOG( SG_GENERAL, SG_ALERT, "" );
SG_LOG( SG_GENERAL, SG_ALERT, "<work_dir>" ); SG_LOG( SG_GENERAL, SG_ALERT, "<work_dir>" );
SG_LOG( SG_GENERAL, SG_ALERT, " Directory to put the polygon files in" ); SG_LOG( SG_GENERAL, SG_ALERT, " Directory to put the polygon files in" );
@ -528,6 +593,17 @@ int main( int argc, char **argv ) {
spat_max_y=atof(argv[5]); spat_max_y=atof(argv[5]);
argv+=5; argv+=5;
argc-=5; argc-=5;
} else if (!strcmp(argv[1],"--threads")) {
if (argc<3) {
usage(progname);
}
num_threads=atoi(argv[2]);
argv+=2;
argc-=2;
} else if (!strcmp(argv[1],"--all-threads")) {
num_threads=boost::thread::hardware_concurrency();
argv+=1;
argc-=1;
} else if (!strcmp(argv[1],"--help")) { } else if (!strcmp(argv[1],"--help")) {
usage(progname); usage(progname);
} else { } else {