refactor: buffer, channel, rate, and resolver packages to introduce constants for magic numbers
This commit is contained in:
@@ -16,6 +16,19 @@ const (
|
||||
MaxChunkLen = 16 * 1024
|
||||
MaxDataLen = 457 // MDU - 2 - 6 (2 for stream header, 6 for channel envelope)
|
||||
CompressTries = 4
|
||||
|
||||
// Stream header flags
|
||||
StreamHeaderEOF = 0x8000
|
||||
StreamHeaderCompressed = 0x4000
|
||||
|
||||
// Message type
|
||||
StreamDataMessageType = 0x01
|
||||
|
||||
// Header size
|
||||
StreamHeaderSize = 2
|
||||
|
||||
// Compression threshold
|
||||
CompressThreshold = 32
|
||||
)
|
||||
|
||||
type StreamDataMessage struct {
|
||||
@@ -28,10 +41,10 @@ type StreamDataMessage struct {
|
||||
func (m *StreamDataMessage) Pack() ([]byte, error) {
|
||||
headerVal := uint16(m.StreamID & StreamIDMax)
|
||||
if m.EOF {
|
||||
headerVal |= 0x8000
|
||||
headerVal |= StreamHeaderEOF
|
||||
}
|
||||
if m.Compressed {
|
||||
headerVal |= 0x4000
|
||||
headerVal |= StreamHeaderCompressed
|
||||
}
|
||||
|
||||
buf := new(bytes.Buffer)
|
||||
@@ -43,19 +56,19 @@ func (m *StreamDataMessage) Pack() ([]byte, error) {
|
||||
}
|
||||
|
||||
func (m *StreamDataMessage) GetType() uint16 {
|
||||
return 0x01 // Assign appropriate message type constant
|
||||
return StreamDataMessageType
|
||||
}
|
||||
|
||||
func (m *StreamDataMessage) Unpack(data []byte) error {
|
||||
if len(data) < 2 {
|
||||
if len(data) < StreamHeaderSize {
|
||||
return io.ErrShortBuffer
|
||||
}
|
||||
|
||||
header := binary.BigEndian.Uint16(data[:2])
|
||||
header := binary.BigEndian.Uint16(data[:StreamHeaderSize])
|
||||
m.StreamID = header & StreamIDMax
|
||||
m.EOF = (header & 0x8000) != 0
|
||||
m.Compressed = (header & 0x4000) != 0
|
||||
m.Data = data[2:]
|
||||
m.EOF = (header & StreamHeaderEOF) != 0
|
||||
m.Compressed = (header & StreamHeaderCompressed) != 0
|
||||
m.Data = data[StreamHeaderSize:]
|
||||
|
||||
return nil
|
||||
}
|
||||
@@ -163,7 +176,7 @@ func (w *RawChannelWriter) Write(p []byte) (n int, err error) {
|
||||
EOF: w.eof,
|
||||
}
|
||||
|
||||
if len(p) > 32 {
|
||||
if len(p) > CompressThreshold {
|
||||
for try := 1; try < CompressTries; try++ {
|
||||
chunkLen := len(p) / try
|
||||
compressed := compressData(p[:chunkLen])
|
||||
|
||||
@@ -6,6 +6,7 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"git.quad4.io/Networks/Reticulum-Go/pkg/common"
|
||||
"git.quad4.io/Networks/Reticulum-Go/pkg/debug"
|
||||
"git.quad4.io/Networks/Reticulum-Go/pkg/transport"
|
||||
)
|
||||
@@ -33,6 +34,19 @@ const (
|
||||
SeqModulus uint16 = SeqMax
|
||||
|
||||
FastRateThreshold = 10
|
||||
|
||||
// Timeout calculation constants
|
||||
RTTMinThreshold = 0.025
|
||||
TimeoutBaseMultiplier = 1.5
|
||||
TimeoutRingMultiplier = 2.5
|
||||
TimeoutRingOffset = 2
|
||||
|
||||
// Packet header constants
|
||||
ChannelHeaderSize = 6
|
||||
ChannelHeaderBits = 8
|
||||
|
||||
// Default retry count
|
||||
DefaultMaxTries = 3
|
||||
)
|
||||
|
||||
// MessageState represents the state of a message
|
||||
@@ -95,7 +109,7 @@ func NewChannel(link transport.LinkInterface) *Channel {
|
||||
windowMax: WindowMaxSlow,
|
||||
windowMin: WindowMinSlow,
|
||||
window: WindowInitial,
|
||||
maxTries: 3,
|
||||
maxTries: DefaultMaxTries,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -112,7 +126,7 @@ func (c *Channel) Send(msg MessageBase) error {
|
||||
}
|
||||
|
||||
c.mutex.Lock()
|
||||
c.nextSequence = (c.nextSequence + 1) % SeqModulus
|
||||
c.nextSequence = (c.nextSequence + common.ONE) % SeqModulus
|
||||
c.txRing = append(c.txRing, env)
|
||||
c.mutex.Unlock()
|
||||
|
||||
@@ -175,11 +189,11 @@ func (c *Channel) handleDelivered(packet interface{}) {
|
||||
|
||||
func (c *Channel) getPacketTimeout(tries int) time.Duration {
|
||||
rtt := c.link.GetRTT()
|
||||
if rtt < 0.025 {
|
||||
rtt = 0.025
|
||||
if rtt < RTTMinThreshold {
|
||||
rtt = RTTMinThreshold
|
||||
}
|
||||
|
||||
timeout := math.Pow(1.5, float64(tries-1)) * rtt * 2.5 * float64(len(c.txRing)+2)
|
||||
timeout := math.Pow(TimeoutBaseMultiplier, float64(tries-common.ONE)) * rtt * TimeoutRingMultiplier * float64(len(c.txRing)+TimeoutRingOffset)
|
||||
return time.Duration(timeout * float64(time.Second))
|
||||
}
|
||||
|
||||
@@ -207,10 +221,10 @@ func (c *Channel) updateRateThresholds() {
|
||||
rtt := c.link.RTT()
|
||||
|
||||
if rtt > RTTFast {
|
||||
c.fastRateRounds = 0
|
||||
c.fastRateRounds = common.ZERO
|
||||
|
||||
if rtt > RTTMedium {
|
||||
c.medRateRounds = 0
|
||||
c.medRateRounds = common.ZERO
|
||||
} else {
|
||||
c.medRateRounds++
|
||||
if c.windowMax < WindowMaxMedium && c.medRateRounds == FastRateThreshold {
|
||||
@@ -228,19 +242,19 @@ func (c *Channel) updateRateThresholds() {
|
||||
}
|
||||
|
||||
func (c *Channel) HandleInbound(data []byte) error {
|
||||
if len(data) < 6 {
|
||||
if len(data) < ChannelHeaderSize {
|
||||
return errors.New("channel packet too short")
|
||||
}
|
||||
|
||||
msgType := uint16(data[0])<<8 | uint16(data[1])
|
||||
sequence := uint16(data[2])<<8 | uint16(data[3])
|
||||
length := uint16(data[4])<<8 | uint16(data[5])
|
||||
msgType := uint16(data[0])<<ChannelHeaderBits | uint16(data[1])
|
||||
sequence := uint16(data[2])<<ChannelHeaderBits | uint16(data[3])
|
||||
length := uint16(data[4])<<ChannelHeaderBits | uint16(data[5])
|
||||
|
||||
if len(data) < 6+int(length) {
|
||||
if len(data) < ChannelHeaderSize+int(length) {
|
||||
return errors.New("channel packet incomplete")
|
||||
}
|
||||
|
||||
msgData := data[6 : 6+length]
|
||||
msgData := data[ChannelHeaderSize : ChannelHeaderSize+length]
|
||||
|
||||
c.mutex.Lock()
|
||||
defer c.mutex.Unlock()
|
||||
|
||||
@@ -3,6 +3,8 @@ package rate
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"git.quad4.io/Networks/Reticulum-Go/pkg/common"
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -15,6 +17,13 @@ const (
|
||||
DefaultBurstPenalty = 300 // Default seconds penalty after burst
|
||||
DefaultMaxHeldAnnounces = 256 // Default max announces in hold queue
|
||||
DefaultHeldReleaseInterval = 30 // Default seconds between releasing held announces
|
||||
|
||||
// Allowance thresholds
|
||||
AllowanceMinThreshold = 1.0
|
||||
AllowanceDecrement = 1.0
|
||||
|
||||
// History check threshold
|
||||
HistoryGraceThreshold = 1
|
||||
)
|
||||
|
||||
type Limiter struct {
|
||||
@@ -47,11 +56,11 @@ func (l *Limiter) Allow() bool {
|
||||
l.allowance = l.rate
|
||||
}
|
||||
|
||||
if l.allowance < 1.0 {
|
||||
if l.allowance < AllowanceMinThreshold {
|
||||
return false
|
||||
}
|
||||
|
||||
l.allowance -= 1.0
|
||||
l.allowance -= AllowanceDecrement
|
||||
return true
|
||||
}
|
||||
|
||||
@@ -100,7 +109,7 @@ func (arc *AnnounceRateControl) AllowAnnounce(destHash string) bool {
|
||||
// Check rate
|
||||
lastAnnounce := history[len(history)-1]
|
||||
waitTime := arc.rateTarget
|
||||
if len(history) > arc.rateGrace {
|
||||
if len(history) > arc.rateGrace+HistoryGraceThreshold {
|
||||
waitTime += arc.ratePenalty
|
||||
}
|
||||
|
||||
@@ -155,7 +164,7 @@ func (ic *IngressControl) ProcessAnnounce(announceHash string, announceData []by
|
||||
|
||||
// Reset counter if enough time has passed
|
||||
if elapsed > ic.burstHold+ic.burstPenalty {
|
||||
ic.announceCount = 0
|
||||
ic.announceCount = common.ZERO
|
||||
ic.lastBurst = now
|
||||
}
|
||||
|
||||
|
||||
@@ -10,6 +10,17 @@ import (
|
||||
"git.quad4.io/Networks/Reticulum-Go/pkg/identity"
|
||||
)
|
||||
|
||||
const (
|
||||
// Hash length conversion (bits to bytes)
|
||||
BitsPerByte = 8
|
||||
|
||||
// Known destination data index
|
||||
KnownDestIdentityIndex = 2
|
||||
|
||||
// Minimum name parts for hierarchical resolution
|
||||
MinNameParts = 2
|
||||
)
|
||||
|
||||
type Resolver struct {
|
||||
cache map[string]*identity.Identity
|
||||
cacheLock sync.RWMutex
|
||||
@@ -36,12 +47,12 @@ func (r *Resolver) ResolveIdentity(fullName string) (*identity.Identity, error)
|
||||
// Hash the full name to create a deterministic identity
|
||||
h := sha256.New()
|
||||
h.Write([]byte(fullName))
|
||||
nameHash := h.Sum(nil)[:identity.NAME_HASH_LENGTH/8]
|
||||
nameHash := h.Sum(nil)[:identity.NAME_HASH_LENGTH/BitsPerByte]
|
||||
hashStr := hex.EncodeToString(nameHash)
|
||||
|
||||
// Check if this identity is known
|
||||
if knownData, exists := identity.GetKnownDestination(hashStr); exists {
|
||||
if id, ok := knownData[2].(*identity.Identity); ok {
|
||||
if id, ok := knownData[KnownDestIdentityIndex].(*identity.Identity); ok {
|
||||
r.cacheLock.Lock()
|
||||
r.cache[fullName] = id
|
||||
r.cacheLock.Unlock()
|
||||
@@ -51,7 +62,7 @@ func (r *Resolver) ResolveIdentity(fullName string) (*identity.Identity, error)
|
||||
|
||||
// Split name into parts for hierarchical resolution
|
||||
parts := strings.Split(fullName, ".")
|
||||
if len(parts) < 2 {
|
||||
if len(parts) < MinNameParts {
|
||||
return nil, errors.New("invalid identity name format")
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user