diff --git a/Nasal/mp_broadcast.nas b/Nasal/mp_broadcast.nas index 62d4a7411..c7a8a53a3 100644 --- a/Nasal/mp_broadcast.nas +++ b/Nasal/mp_broadcast.nas @@ -21,7 +21,7 @@ # EventChannel.register(event_hash, handler) # Register a handler for the event identified by the hash event_hash. # event_hash - hash value for the event : a unique 4 character string -# handler - a handler function for the event : func (msg) +# handler - a handler function for the event : func (sender, msg) # # EventChannel.deregister(event_hash) # Deregister the handler for the event identified by the hash event_hash. @@ -62,7 +62,7 @@ EventChannel.send = func (event_hash, EventChannel._process = func (n, msg) { var event_hash = Binary.readHash(msg); if (contains(me.events, event_hash)) { - me.events[event_hash](substr(msg, Binary.sizeOf["Hash"])); + me.events[event_hash](n, substr(msg, Binary.sizeOf["Hash"])); } } @@ -226,11 +226,64 @@ BroadcastChannel._loop_ = func (id) { } ###################################################################### +############################################################################### +# Lamport clock. Useful for creating a total order for events or messages. +# The users' callsigns are used to break ties. +# +# LamportClock.new() +# Creates a new lamport clock for this user. +# +# LamportClock.merge(sender, sender_timestamp) +# Merges the timestamp from the sender with the local clock. +# sender : base node of the senders property tree +# sender_timestamp : the timestamp received from the sender. +# Returns 1 if the local clock was advanced; 0 otherwise. +# +# LamportClock.advance() +# Advances the local clock one tick. +# +# LamportClock.timestamp() +# Returns an encoded 4 character long timestamp from the local clock. +# +var LamportClock = { + # LamportClock.new() + # Creates a new lamport clock for this user. + new : func { + var obj = { + parents : [LamportClock], + callsign : getprop("/sim/multiplay/callsign"), + time : 0 + }; + return obj; + }, + merge : func (sender, sender_timestamp) { + var sender_time = Binary.decodeInt28(sender_timestamp); + if (sender_time > me.time) { + me.time = sender_time; + return 1; + } elsif ((sender_time == me.time) and + (cmp(sender.getNode("callsign").getValue(), me.callsign) > 0)) { + return 1; + } else { + # The received timestamp is old and should be ignored. + return 0; + } + }, + advance : func { + me.time += 1; + }, + timestamp : func { + return Binary.encodeInt28(me.time); + } +}; + + ############################################################################### # Some routines for encoding/decoding values into/from a string. # NOTE: MP is picky about what it sends in a string propery. # Encode 7 bits as a printable 8 bit character. var Binary = {}; +Binary.TWOTO27 = 134217728; Binary.TWOTO28 = 268435456; Binary.TWOTO31 = 2147483648; Binary.TWOTO32 = 4294967296; @@ -275,6 +328,31 @@ Binary.decodeByte = func (str) { return int(v); } ############################################################ +# NOTE: This encodes a 28 bit integer. +Binary.sizeOf["int28"] = 4; +Binary.encodeInt28 = func (int) { + var bf = bits.buf(4); + if (int < 0) int += Binary.TWOTO32; + var r = int; + for (var i = 0; i < 4; i += 1) { + var c = math.mod(r, 128); + bf[3-i] = c + `A`; + r = (r - c)/128; + } + return bf; +} +############################################################ +Binary.decodeInt28 = func (str) { + var v = 0; + var b = 1; + for (var i = 0; i < 4; i += 1) { + v += (str[3-i] - `A`) * b; + b *= 128; + } + if (v / Binary.TWOTO27 >= 1) v -= Binary.TWOTO28; + return int(v); +} +############################################################ # NOTE: This can neither handle huge values nor really tiny. Binary.sizeOf["double"] = 2*Binary.sizeOf["int"]; Binary.encodeDouble = func (d) { @@ -318,6 +396,8 @@ Binary.stringHash = func (str) { Binary.readHash = func (str) { return substr(str, 0, Binary.sizeOf["Hash"]); } +############################################################ +Binary.sizeOf["LamportTS"] = 4; ###################################################################### ############################################################################### diff --git a/Nasal/scenery.nas b/Nasal/scenery.nas index a436fa305..1b5f90963 100644 --- a/Nasal/scenery.nas +++ b/Nasal/scenery.nas @@ -23,19 +23,41 @@ var sharedDoor = { obj.parents = [sharedDoor] ~ obj.parents; obj.event_hash = mp_broadcast.Binary.stringHash (isa(node, props.Node) ? node.getPath() : node); - events.register(obj.event_hash, func (msg) { obj._process(msg) }); + obj.clock = mp_broadcast.LamportClock.new(); + obj.loopid = 0; + events.register(obj.event_hash, + func (sender, msg) { obj._process(sender, msg) }); return obj; }, toggle: func { - events.send(me.event_hash, mp_broadcast.Binary.encodeByte(me.target)); + # Send current time, current position and target position. + me.clock.advance(); me.move(me.target); + me._loop(me.loopid += 1); }, destroy : func { + me.loopid += 1; events.deregister(me.event_hash); }, - _process : func (msg) { - me.target = mp_broadcast.Binary.decodeByte(msg); - me.move(me.target); + _process : func (sender, msg) { + if (me.clock.merge(sender, msg)) { + me.setpos(mp_broadcast.Binary.decodeDouble + (substr(msg, mp_broadcast.Binary.sizeOf["LamportTS"]))); + me.target = mp_broadcast.Binary.decodeByte + (substr(msg, + mp_broadcast.Binary.sizeOf["LamportTS"] + + mp_broadcast.Binary.sizeOf["double"])); + me.move(me.target); + } + }, + _loop : func (id) { + id == me.loopid or return; + # Send current time, current position and target position. + events.send(me.event_hash, + me.clock.timestamp() ~ + mp_broadcast.Binary.encodeDouble(me.positionN.getValue()) ~ + mp_broadcast.Binary.encodeByte(!me.target)); + settimer(func { me._loop(id); }, 17, 1); } };