1
0
Fork 0

Limit props/telnet subscription send rate

Prevent FDM-derived properties being sent at full speed (120Hz) which
overloads telnet connections. Instead track dirty properties and
send them at the protocol’s update rate (which is presumably what
the user expects)
This commit is contained in:
James Turner 2018-02-01 15:50:50 +00:00
parent e4e5cc9394
commit b96d4bf315
2 changed files with 71 additions and 26 deletions

View file

@ -39,6 +39,7 @@
#include <sstream>
#include <iostream>
#include <errno.h>
#include <algorithm>
#include <Main/globals.hxx>
#include <Viewer/viewmgr.hxx>
@ -52,8 +53,8 @@
#include <map>
#include <vector>
#include <string>
#include <set>
#include <boost/foreach.hpp>
using std::stringstream;
using std::ends;
@ -65,7 +66,7 @@ using std::endl;
* Props connection class.
* This class represents a connection to props client.
*/
class PropsChannel : public simgear::NetChat, public SGPropertyChangeListener
class FGProps::PropsChannel : public simgear::NetChat, public SGPropertyChangeListener
{
simgear::NetBuffer buffer;
@ -84,7 +85,7 @@ public:
/**
* Constructor.
*/
PropsChannel();
PropsChannel(FGProps* owner);
~PropsChannel();
/**
@ -93,15 +94,17 @@ public:
* @param s Character string to append to buffer
* @param n Number of characters to append.
*/
void collectIncomingData( const char* s, int n );
void collectIncomingData( const char* s, int n ) override;
/**
* Process a complete request from the props client.
*/
void foundTerminator();
void foundTerminator() override;
// callback for registered listeners (subscriptions)
void valueChanged(SGPropertyNode *node);
void valueChanged(SGPropertyNode *node) override;
void publishDirtySubscriptions();
private:
typedef string_list ParameterList;
@ -125,35 +128,43 @@ private:
}
std::vector<SGPropertyNode_ptr> _listeners;
std::set<SGPropertyNode_ptr> _dirtySubscriptions;
typedef void (PropsChannel::*TelnetCallback) (const ParameterList&);
std::map<std::string, TelnetCallback> callback_map;
// callback implementations:
void subscribe(const ParameterList &p);
void unsubscribe(const ParameterList &p);
FGProps* _owner = nullptr;
};
/**
*
*/
PropsChannel::PropsChannel()
: buffer(512),
FGProps::PropsChannel::PropsChannel(FGProps* owner)
: buffer(8192),
path("/"),
mode(PROMPT)
mode(PROMPT),
_owner(owner)
{
setTerminator( "\r\n" );
callback_map["subscribe"] = &PropsChannel::subscribe;
callback_map["unsubscribe"] = &PropsChannel::unsubscribe;
}
PropsChannel::~PropsChannel() {
FGProps::PropsChannel::~PropsChannel()
{
// clean up all registered listeners
BOOST_FOREACH(SGPropertyNode_ptr l, _listeners) {
l->removeChangeListener( this );
}
for (SGPropertyNode_ptr l : _listeners) {
l->removeChangeListener(this);
}
_owner->removeChannel(this);
}
void PropsChannel::subscribe(const ParameterList &param) {
void FGProps::PropsChannel::subscribe(const ParameterList &param) {
if (! check_args(param,1,"subscribe")) return;
std::string command = param[0];
@ -179,13 +190,15 @@ void PropsChannel::subscribe(const ParameterList &param) {
}
}
void PropsChannel::unsubscribe(const ParameterList &param) {
void FGProps::PropsChannel::unsubscribe(const ParameterList &param) {
if (!check_args(param,1,"unsubscribe")) return;
try {
SGPropertyNode *n = globals->get_props()->getNode( param[1].c_str() );
if (n)
n->removeChangeListener( this );
if (n) {
n->removeChangeListener( this );
_dirtySubscriptions.erase(n);
}
} catch (sg_exception&) {
error("Error:Listener could not be removed");
}
@ -193,18 +206,30 @@ void PropsChannel::unsubscribe(const ParameterList &param) {
//TODO: provide support for different types of subscriptions MODES ? (child added/removed, thesholds, min/max)
void PropsChannel::valueChanged(SGPropertyNode* ptr) {
//SG_LOG(SG_GENERAL, SG_ALERT, __FILE__<< "@"<<__LINE__ << ":" << __FUNCTION__ << std::endl);
std::stringstream response;
response << ptr->getPath(true) << "=" << ptr->getStringValue() << getTerminator(); //TODO: use hashes, echo several properties at once
push( response.str().c_str() );
}
void FGProps::PropsChannel::valueChanged(SGPropertyNode* ptr)
{
_dirtySubscriptions.insert(ptr);
}
void FGProps::PropsChannel::publishDirtySubscriptions()
{
if (_dirtySubscriptions.empty())
return; // nothing to send
std::stringstream response;
for (auto sub : _dirtySubscriptions) {
response << sub->getPath(true) << "=" << sub->getStringValue() << getTerminator();
}
push(response.str().c_str());
_dirtySubscriptions.clear();
}
/**
*
*/
void
PropsChannel::collectIncomingData( const char* s, int n )
FGProps::PropsChannel::collectIncomingData( const char* s, int n )
{
buffer.append( s, n );
}
@ -249,7 +274,7 @@ getValueTypeString( const SGPropertyNode *node )
*
*/
void
PropsChannel::foundTerminator()
FGProps::PropsChannel::foundTerminator()
{
const char* cmd = buffer.getData();
SG_LOG( SG_IO, SG_INFO, "processing command = \"" << cmd << "\"" );
@ -696,6 +721,11 @@ bool
FGProps::process()
{
poller.poll();
for (auto channel : _activeChannels) {
channel->publishDirtySubscriptions();
}
return true;
}
@ -709,7 +739,18 @@ FGProps::handleAccept()
int handle = accept( &addr );
SG_LOG( SG_IO, SG_INFO, "Props server accepted connection from "
<< addr.getHost() << ":" << addr.getPort() );
PropsChannel* channel = new PropsChannel();
PropsChannel* channel = new PropsChannel(this);
channel->setHandle( handle );
poller.addChannel( channel );
_activeChannels.push_back(channel);
}
void FGProps::removeChannel(FGProps::PropsChannel *channel)
{
auto it = std::find(_activeChannels.begin(), _activeChannels.end(), channel);
if (it == _activeChannels.end()) {
SG_LOG(SG_IO, SG_WARN, "FGProps::removeChannel: unknown channel");
} else {
_activeChannels.erase(it);
}
}

View file

@ -44,12 +44,15 @@ class FGProps : public FGProtocol,
public SGPropertyChangeListener // for subscriptions
{
private:
class PropsChannel;
/**
* Server port to listen on.
*/
int port;
simgear::NetChannelPoller poller;
std::vector<PropsChannel*> _activeChannels;
public:
/**
* Create a new TCP server.
@ -83,6 +86,7 @@ public:
*/
void handleAccept();
void removeChannel(PropsChannel* channel);
};
#endif // _FG_PROPS_HXX