From 4b97381559fc85bb4b4baba7d0e9f484d0107914 Mon Sep 17 00:00:00 2001 From: Peter Sadrozinski Date: Fri, 14 Dec 2012 20:44:26 -0500 Subject: [PATCH] multi-threaded ogrdecode - use --all-threads to use all available CPUs - use --threads x to use x threads --- src/Prep/OGRDecode/ogr-decode.cxx | 368 ++++++++++++++++++------------ 1 file changed, 222 insertions(+), 146 deletions(-) diff --git a/src/Prep/OGRDecode/ogr-decode.cxx b/src/Prep/OGRDecode/ogr-decode.cxx index 1e983738..d908b9e2 100644 --- a/src/Prep/OGRDecode/ogr-decode.cxx +++ b/src/Prep/OGRDecode/ogr-decode.cxx @@ -26,9 +26,13 @@ #include #include +#include #include #include +#include +#include + #include #include #include @@ -40,11 +44,11 @@ #include /* stretch endpoints to reduce slivers in linear data ~.1 meters */ -// #define EP_STRETCH (0.000001) #define EP_STRETCH (0.1) using std::string; +// scope? int line_width=50; string line_width_col; int point_width=500; @@ -60,8 +64,40 @@ bool use_attribute_query=false; string attribute_query; bool use_spatial_query=false; double spat_min_x, spat_min_y, spat_max_x, spat_max_y; +int num_threads = 1; -void processPoint(OGRPoint* poGeometry, const string& area_type, int width, tgChopper& results ) +SGLockedQueue global_workQueue; + +class Decoder : public SGThread +{ +public: + Decoder( OGRCoordinateTransformation *poct, int atf, int pwf, int lwf, tgChopper& c ) : chopper(c) { + poCT = poct; + area_type_field = atf; + point_width_field = pwf; + line_width_field = lwf; + } + +private: + virtual void run(); + + void processPoint(OGRPoint* poGeometry, const string& area_type, int width ); + void processLineString(OGRLineString* poGeometry, const string& area_type, int width, int with_texture ); + void processPolygon(OGRPolygon* poGeometry, const string& area_type ); + +private: + // The transformation for each geometry object + OGRCoordinateTransformation *poCT; + + // Store the reults per tile + tgChopper& chopper; + + int area_type_field; + int point_width_field; + int line_width_field; +}; + +void Decoder::processPoint(OGRPoint* poGeometry, const string& area_type, int width ) { SGGeod point = SGGeod::fromDeg( poGeometry->getX(),poGeometry->getY() ); tgPolygon shape = tgPolygon::Expand( point, width ); @@ -72,10 +108,10 @@ void processPoint(OGRPoint* poGeometry, const string& area_type, int width, tgCh shape.SetPreserve3D( false ); shape.SetTexMethod( TG_TEX_BY_GEODE ); - results.Add( shape, area_type ); + chopper.Add( shape, area_type ); } -void processLineString(OGRLineString* poGeometry, const string& area_type, int width, int with_texture, tgChopper& results ) +void Decoder::processLineString(OGRLineString* poGeometry, const string& area_type, int width, int with_texture ) { tgpolygon_list segments; tgContour line; @@ -135,16 +171,16 @@ void processLineString(OGRLineString* poGeometry, const string& area_type, int w for ( unsigned int i=0; igetGeometryType()&wkb25DBit)==wkb25DBit); @@ -157,9 +193,147 @@ void processPolygon(OGRPolygon* poGeometry, const string& area_type, tgChopper& // shape.SetPreserve3D( preserve3D ); shape.SetPreserve3D( false ); - results.Add( shape, area_type ); + chopper.Add( shape, area_type ); } +void Decoder::run() +{ + // as long as we have feometry to parse, do so + while (!global_workQueue.empty()) { + OGRFeature *poFeature = global_workQueue.pop(); + if ( poFeature ) { + OGRGeometry *poGeometry = poFeature->GetGeometryRef(); + + if (poGeometry==NULL) { + SG_LOG( SG_GENERAL, SG_INFO, "Found feature without geometry!" ); + if (!continue_on_errors) { + SG_LOG( SG_GENERAL, SG_ALERT, "Aborting!" ); + exit( 1 ); + } else { + continue; + } + } + + OGRwkbGeometryType geoType=wkbFlatten(poGeometry->getGeometryType()); + if (geoType!=wkbPoint && geoType!=wkbMultiPoint && + geoType!=wkbLineString && geoType!=wkbMultiLineString && + geoType!=wkbPolygon && geoType!=wkbMultiPolygon) { + SG_LOG( SG_GENERAL, SG_INFO, "Unknown feature" ); + continue; + } + + string area_type_name=area_type; + if (area_type_field!=-1) { + area_type_name=poFeature->GetFieldAsString(area_type_field); + } + + if ( is_ocean_area(area_type_name) ) { + // interior of polygon is ocean, holes are islands + + SG_LOG( SG_GENERAL, SG_ALERT, "Ocean area ... SKIPPING!" ); + + // Ocean data now comes from GSHHS so we want to ignore + // all other ocean data + continue; + } else if ( is_void_area(area_type_name) ) { + // interior is ???? + + // skip for now + SG_LOG( SG_GENERAL, SG_ALERT, "Void area ... SKIPPING!" ); + + continue; + } else if ( is_null_area(area_type_name) ) { + // interior is ???? + + // skip for now + SG_LOG( SG_GENERAL, SG_ALERT, "Null area ... SKIPPING!" ); + + continue; + } + + poGeometry->transform( poCT ); + + switch (geoType) { + case wkbPoint: { + SG_LOG( SG_GENERAL, SG_DEBUG, "Point feature" ); + int width=point_width; + if (point_width_field!=-1) { + width=poFeature->GetFieldAsInteger(point_width_field); + if (width == 0) { + width=point_width; + } + } + processPoint((OGRPoint*)poGeometry, area_type_name, width); + break; + } + case wkbMultiPoint: { + SG_LOG( SG_GENERAL, SG_DEBUG, "MultiPoint feature" ); + int width=point_width; + if (point_width_field!=-1) { + width=poFeature->GetFieldAsInteger(point_width_field); + if (width == 0) { + width=point_width; + } + } + OGRMultiPoint* multipt=(OGRMultiPoint*)poGeometry; + for (int i=0;igetNumGeometries();i++) { + processPoint((OGRPoint*)(multipt->getGeometryRef(i)), area_type_name, width); + } + break; + } + case wkbLineString: { + SG_LOG( SG_GENERAL, SG_DEBUG, "LineString feature" ); + int width=line_width; + if (line_width_field!=-1) { + width=poFeature->GetFieldAsInteger(line_width_field); + if (width == 0) { + width=line_width; + } + } + + processLineString((OGRLineString*)poGeometry, area_type_name, width, texture_lines); + break; + } + case wkbMultiLineString: { + SG_LOG( SG_GENERAL, SG_DEBUG, "MultiLineString feature" ); + int width=line_width; + if (line_width_field!=-1) { + width=poFeature->GetFieldAsInteger(line_width_field); + if (width == 0) { + width=line_width; + } + } + + OGRMultiLineString* multils=(OGRMultiLineString*)poGeometry; + for (int i=0;igetNumGeometries();i++) { + processLineString((OGRLineString*)poGeometry, area_type_name, width, texture_lines); + } + break; + } + case wkbPolygon: { + SG_LOG( SG_GENERAL, SG_DEBUG, "Polygon feature" ); + processPolygon((OGRPolygon*)poGeometry, area_type_name); + break; + } + case wkbMultiPolygon: { + SG_LOG( SG_GENERAL, SG_DEBUG, "MultiPolygon feature" ); + OGRMultiPolygon* multipoly=(OGRMultiPolygon*)poGeometry; + for (int i=0;igetNumGeometries();i++) { + processPolygon((OGRPolygon*)(multipoly->getGeometryRef(i)), area_type_name); + } + break; + } + default: + /* Ignore unhandled objects */ + break; + } + + OGRFeature::DestroyFeature( poFeature ); + } + } +} + +// Main Thread void processLayer(OGRLayer* poLayer, tgChopper& results ) { int feature_count=poLayer->GetFeatureCount(); @@ -218,9 +392,7 @@ void processLayer(OGRLayer* poLayer, tgChopper& results ) oTargetSRS.SetWellKnownGeogCS( "WGS84" ); - OGRCoordinateTransformation *poCT; - - poCT = OGRCreateCoordinateTransformation(oSourceSRS, &oTargetSRS); + OGRCoordinateTransformation *poCT = OGRCreateCoordinateTransformation(oSourceSRS, &oTargetSRS); /* setup attribute and spatial queries */ if (use_spatial_query) { @@ -249,146 +421,35 @@ void processLayer(OGRLayer* poLayer, tgChopper& results ) } } - // PSADRO TODO : Generate the work queue for this layer - // Then process the workqueue with threads + // Generate the work queue for this layer OGRFeature *poFeature; poLayer->SetNextByIndex(start_record); - int numFeatures = poLayer->GetFeatureCount(); - int cur_feature = 1; - for ( ; (poFeature = poLayer->GetNextFeature()) != NULL; OGRFeature::DestroyFeature( poFeature ) ) + while ( ( poFeature = poLayer->GetNextFeature()) != NULL ) { - OGRGeometry *poGeometry; - - poGeometry = poFeature->GetGeometryRef(); - - if (poGeometry==NULL) { - SG_LOG( SG_GENERAL, SG_INFO, "Found feature without geometry!" ); - if (!continue_on_errors) { - SG_LOG( SG_GENERAL, SG_ALERT, "Aborting!" ); - exit( 1 ); - } else { - continue; - } - } - - assert(poGeometry!=NULL); - - OGRwkbGeometryType geoType=wkbFlatten(poGeometry->getGeometryType()); - - if (geoType!=wkbPoint && geoType!=wkbMultiPoint && - geoType!=wkbLineString && geoType!=wkbMultiLineString && - geoType!=wkbPolygon && geoType!=wkbMultiPolygon) { - SG_LOG( SG_GENERAL, SG_INFO, "Unknown feature" ); - continue; - } - - string area_type_name=area_type; - if (area_type_field!=-1) { - area_type_name=poFeature->GetFieldAsString(area_type_field); - } - - if ( is_ocean_area(area_type_name) ) { - // interior of polygon is ocean, holes are islands - - SG_LOG( SG_GENERAL, SG_ALERT, "Ocean area ... SKIPPING!" ); - - // Ocean data now comes from GSHHS so we want to ignore - // all other ocean data - continue; - } else if ( is_void_area(area_type_name) ) { - // interior is ???? - - // skip for now - SG_LOG( SG_GENERAL, SG_ALERT, "Void area ... SKIPPING!" ); - - continue; - } else if ( is_null_area(area_type_name) ) { - // interior is ???? - - // skip for now - SG_LOG( SG_GENERAL, SG_ALERT, "Null area ... SKIPPING!" ); - - continue; - } - - poGeometry->transform( poCT ); - - switch (geoType) { - case wkbPoint: { - SG_LOG( SG_GENERAL, SG_DEBUG, "Point feature" ); - int width=point_width; - if (point_width_field!=-1) { - width=poFeature->GetFieldAsInteger(point_width_field); - if (width == 0) { - width=point_width; - } - } - processPoint((OGRPoint*)poGeometry, area_type_name, width, results); - break; - } - case wkbMultiPoint: { - SG_LOG( SG_GENERAL, SG_DEBUG, "MultiPoint feature" ); - int width=point_width; - if (point_width_field!=-1) { - width=poFeature->GetFieldAsInteger(point_width_field); - if (width == 0) { - width=point_width; - } - } - OGRMultiPoint* multipt=(OGRMultiPoint*)poGeometry; - for (int i=0;igetNumGeometries();i++) { - processPoint((OGRPoint*)(multipt->getGeometryRef(i)), area_type_name, width, results); - } - break; - } - case wkbLineString: { - SG_LOG( SG_GENERAL, SG_DEBUG, "LineString feature" ); - int width=line_width; - if (line_width_field!=-1) { - width=poFeature->GetFieldAsInteger(line_width_field); - if (width == 0) { - width=line_width; - } - } - - processLineString((OGRLineString*)poGeometry, area_type_name, width, texture_lines, results); - break; - } - case wkbMultiLineString: { - SG_LOG( SG_GENERAL, SG_DEBUG, "MultiLineString feature" ); - int width=line_width; - if (line_width_field!=-1) { - width=poFeature->GetFieldAsInteger(line_width_field); - if (width == 0) { - width=line_width; - } - } - - OGRMultiLineString* multils=(OGRMultiLineString*)poGeometry; - for (int i=0;igetNumGeometries();i++) { - processLineString((OGRLineString*)poGeometry, area_type_name, width, texture_lines, results); - } - break; - } - case wkbPolygon: { - SG_LOG( SG_GENERAL, SG_DEBUG, "Polygon feature" ); - processPolygon((OGRPolygon*)poGeometry, area_type_name, results); - break; - } - case wkbMultiPolygon: { - SG_LOG( SG_GENERAL, SG_DEBUG, "MultiPolygon feature" ); - OGRMultiPolygon* multipoly=(OGRMultiPolygon*)poGeometry; - for (int i=0;igetNumGeometries();i++) { - processPolygon((OGRPolygon*)(multipoly->getGeometryRef(i)), area_type_name, results); - } - break; - } - default: - /* Ignore unhandled objects */ - break; - } + global_workQueue.push( poFeature ); } + // Now process the workqueue with threads + std::vector decoders; + for (int i=0; istart(); + decoders.push_back( decoder ); + } + +#if 0 + while (!global_workQueue.empty()) { + sleep(1); + } +#endif + + // Then wait until they are finished + for (unsigned int i=0; ijoin(); + } + + results.Save(); + OCTDestroyCoordinateTransformation ( poCT ); } @@ -423,6 +484,10 @@ void usage(char* progname) { SG_LOG( SG_GENERAL, SG_ALERT, " spatial query extents" ); SG_LOG( SG_GENERAL, SG_ALERT, "--texture-lines" ); SG_LOG( SG_GENERAL, SG_ALERT, " Enable textured lines" ); + SG_LOG( SG_GENERAL, SG_ALERT, "--threads" ); + SG_LOG( SG_GENERAL, SG_ALERT, " Enable multithreading with user specified number of threads" ); + SG_LOG( SG_GENERAL, SG_ALERT, "--all-threads" ); + SG_LOG( SG_GENERAL, SG_ALERT, " Enable multithreading with all available cpu cores" ); SG_LOG( SG_GENERAL, SG_ALERT, "" ); SG_LOG( SG_GENERAL, SG_ALERT, "" ); SG_LOG( SG_GENERAL, SG_ALERT, " Directory to put the polygon files in" ); @@ -528,6 +593,17 @@ int main( int argc, char **argv ) { spat_max_y=atof(argv[5]); argv+=5; argc-=5; + } else if (!strcmp(argv[1],"--threads")) { + if (argc<3) { + usage(progname); + } + num_threads=atoi(argv[2]); + argv+=2; + argc-=2; + } else if (!strcmp(argv[1],"--all-threads")) { + num_threads=boost::thread::hardware_concurrency(); + argv+=1; + argc-=1; } else if (!strcmp(argv[1],"--help")) { usage(progname); } else {