buffer, channel, more transport constants

This commit is contained in:
Sudo-Ivan
2024-12-31 17:02:51 -06:00
parent 8df4039b18
commit 54c401e2a5
3 changed files with 530 additions and 2 deletions

240
pkg/buffer/buffer.go Normal file
View File

@@ -0,0 +1,240 @@
package buffer
import (
"bufio"
"bytes"
"compress/bzip2"
"encoding/binary"
"io"
"sync"
"github.com/Sudo-Ivan/reticulum-go/pkg/channel"
)
const (
StreamIDMax = 0x3fff // 16383
MaxChunkLen = 16 * 1024
MaxDataLen = 457 // MDU - 2 - 6 (2 for stream header, 6 for channel envelope)
CompressTries = 4
)
type StreamDataMessage struct {
StreamID uint16
Data []byte
EOF bool
Compressed bool
}
func (m *StreamDataMessage) Pack() ([]byte, error) {
headerVal := uint16(m.StreamID & StreamIDMax)
if m.EOF {
headerVal |= 0x8000
}
if m.Compressed {
headerVal |= 0x4000
}
buf := new(bytes.Buffer)
binary.Write(buf, binary.BigEndian, headerVal)
buf.Write(m.Data)
return buf.Bytes(), nil
}
func (m *StreamDataMessage) GetType() uint16 {
return 0x01 // Assign appropriate message type constant
}
func (m *StreamDataMessage) Unpack(data []byte) error {
if len(data) < 2 {
return io.ErrShortBuffer
}
header := binary.BigEndian.Uint16(data[:2])
m.StreamID = header & StreamIDMax
m.EOF = (header & 0x8000) != 0
m.Compressed = (header & 0x4000) != 0
m.Data = data[2:]
return nil
}
type RawChannelReader struct {
streamID int
channel *channel.Channel
buffer *bytes.Buffer
eof bool
callbacks []func(int)
mutex sync.RWMutex
}
func NewRawChannelReader(streamID int, ch *channel.Channel) *RawChannelReader {
reader := &RawChannelReader{
streamID: streamID,
channel: ch,
buffer: bytes.NewBuffer(nil),
callbacks: make([]func(int), 0),
}
ch.AddMessageHandler(reader.HandleMessage)
return reader
}
func (r *RawChannelReader) AddReadyCallback(cb func(int)) {
r.mutex.Lock()
defer r.mutex.Unlock()
r.callbacks = append(r.callbacks, cb)
}
func (r *RawChannelReader) RemoveReadyCallback(cb func(int)) {
r.mutex.Lock()
defer r.mutex.Unlock()
for i, fn := range r.callbacks {
if &fn == &cb {
r.callbacks = append(r.callbacks[:i], r.callbacks[i+1:]...)
break
}
}
}
func (r *RawChannelReader) Read(p []byte) (n int, err error) {
r.mutex.Lock()
defer r.mutex.Unlock()
if r.buffer.Len() == 0 && r.eof {
return 0, io.EOF
}
n, err = r.buffer.Read(p)
if err == io.EOF && !r.eof {
err = nil
}
return
}
func (r *RawChannelReader) HandleMessage(msg channel.MessageBase) bool {
if streamMsg, ok := msg.(*StreamDataMessage); ok && streamMsg.StreamID == uint16(r.streamID) {
r.mutex.Lock()
defer r.mutex.Unlock()
if streamMsg.Compressed {
decompressed := decompressData(streamMsg.Data)
r.buffer.Write(decompressed)
} else {
r.buffer.Write(streamMsg.Data)
}
if streamMsg.EOF {
r.eof = true
}
// Notify callbacks
for _, cb := range r.callbacks {
cb(r.buffer.Len())
}
return true
}
return false
}
type RawChannelWriter struct {
streamID int
channel *channel.Channel
eof bool
}
func NewRawChannelWriter(streamID int, ch *channel.Channel) *RawChannelWriter {
return &RawChannelWriter{
streamID: streamID,
channel: ch,
}
}
func (w *RawChannelWriter) Write(p []byte) (n int, err error) {
if len(p) > MaxChunkLen {
p = p[:MaxChunkLen]
}
msg := &StreamDataMessage{
StreamID: uint16(w.streamID),
Data: p,
EOF: w.eof,
}
if len(p) > 32 {
for try := 1; try < CompressTries; try++ {
chunkLen := len(p) / try
compressed := compressData(p[:chunkLen])
if len(compressed) < MaxDataLen && len(compressed) < chunkLen {
msg.Data = compressed
msg.Compressed = true
break
}
}
}
if err := w.channel.Send(msg); err != nil {
return 0, err
}
return len(p), nil
}
func (w *RawChannelWriter) Close() error {
w.eof = true
_, err := w.Write(nil)
return err
}
type Buffer struct {
ReadWriter *bufio.ReadWriter
}
func (b *Buffer) Write(p []byte) (n int, err error) {
return b.ReadWriter.Write(p)
}
func (b *Buffer) Read(p []byte) (n int, err error) {
return b.ReadWriter.Read(p)
}
func (b *Buffer) Close() error {
if err := b.ReadWriter.Writer.Flush(); err != nil {
return err
}
return nil
}
func CreateReader(streamID int, ch *channel.Channel, readyCallback func(int)) *bufio.Reader {
raw := NewRawChannelReader(streamID, ch)
if readyCallback != nil {
raw.AddReadyCallback(readyCallback)
}
return bufio.NewReader(raw)
}
func CreateWriter(streamID int, ch *channel.Channel) *bufio.Writer {
raw := NewRawChannelWriter(streamID, ch)
return bufio.NewWriter(raw)
}
func CreateBidirectionalBuffer(receiveStreamID, sendStreamID int, ch *channel.Channel, readyCallback func(int)) *bufio.ReadWriter {
reader := CreateReader(receiveStreamID, ch, readyCallback)
writer := CreateWriter(sendStreamID, ch)
return bufio.NewReadWriter(reader, writer)
}
func compressData(data []byte) []byte {
var compressed bytes.Buffer
w := bytes.NewBuffer(data)
r := bzip2.NewReader(w)
io.Copy(&compressed, r)
return compressed.Bytes()
}
func decompressData(data []byte) []byte {
reader := bzip2.NewReader(bytes.NewReader(data))
var decompressed bytes.Buffer
io.Copy(&decompressed, reader)
return decompressed.Bytes()
}

218
pkg/channel/channel.go Normal file
View File

@@ -0,0 +1,218 @@
package channel
import (
"errors"
"math"
"sync"
"time"
"github.com/Sudo-Ivan/reticulum-go/pkg/transport"
)
const (
// Window sizes and thresholds
WindowInitial = 2
WindowMin = 2
WindowMinSlow = 2
WindowMinMedium = 5
WindowMinFast = 16
WindowMaxSlow = 5
WindowMaxMedium = 12
WindowMaxFast = 48
WindowMax = WindowMaxFast
WindowFlexibility = 4
// RTT thresholds
RTTFast = 0.18
RTTMedium = 0.75
RTTSlow = 1.45
// Sequence numbers
SeqMax uint16 = 0xFFFF
SeqModulus uint16 = SeqMax
FastRateThreshold = 10
)
// MessageState represents the state of a message
type MessageState int
const (
MsgStateNew MessageState = iota
MsgStateSent
MsgStateDelivered
MsgStateFailed
)
// MessageBase defines the interface for messages that can be sent over a channel
type MessageBase interface {
Pack() ([]byte, error)
Unpack([]byte) error
GetType() uint16
}
// Channel manages reliable message delivery over a transport link
type Channel struct {
link transport.LinkInterface
mutex sync.RWMutex
txRing []*Envelope
rxRing []*Envelope
window int
windowMax int
windowMin int
windowFlex int
nextSequence uint16
nextRxSequence uint16
maxTries int
fastRateRounds int
medRateRounds int
messageHandlers []func(MessageBase) bool
}
// Envelope wraps a message with metadata for transmission
type Envelope struct {
Sequence uint16
Message MessageBase
Raw []byte
Packet interface{}
Tries int
Timestamp time.Time
}
// NewChannel creates a new Channel instance
func NewChannel(link transport.LinkInterface) *Channel {
return &Channel{
link: link,
messageHandlers: make([]func(MessageBase) bool, 0),
mutex: sync.RWMutex{},
windowMax: WindowMaxSlow,
windowMin: WindowMinSlow,
window: WindowInitial,
maxTries: 3,
}
}
// Send transmits a message over the channel
func (c *Channel) Send(msg MessageBase) error {
if c.link.GetStatus() != transport.STATUS_ACTIVE {
return errors.New("link not ready")
}
env := &Envelope{
Sequence: c.nextSequence,
Message: msg,
Timestamp: time.Now(),
}
c.mutex.Lock()
c.nextSequence = (c.nextSequence + 1) % SeqModulus
c.txRing = append(c.txRing, env)
c.mutex.Unlock()
data, err := msg.Pack()
if err != nil {
return err
}
env.Raw = data
packet := c.link.Send(data)
env.Packet = packet
env.Tries++
timeout := c.getPacketTimeout(env.Tries)
c.link.SetPacketTimeout(packet, c.handleTimeout, timeout)
c.link.SetPacketDelivered(packet, c.handleDelivered)
return nil
}
// handleTimeout handles packet timeout events
func (c *Channel) handleTimeout(packet interface{}) {
c.mutex.Lock()
defer c.mutex.Unlock()
for _, env := range c.txRing {
if env.Packet == packet {
if env.Tries >= c.maxTries {
// Remove from ring and notify failure
return
}
env.Tries++
c.link.Resend(packet)
timeout := c.getPacketTimeout(env.Tries)
c.link.SetPacketTimeout(packet, c.handleTimeout, timeout)
break
}
}
}
// handleDelivered handles packet delivery confirmations
func (c *Channel) handleDelivered(packet interface{}) {
c.mutex.Lock()
defer c.mutex.Unlock()
for i, env := range c.txRing {
if env.Packet == packet {
c.txRing = append(c.txRing[:i], c.txRing[i+1:]...)
break
}
}
}
func (c *Channel) getPacketTimeout(tries int) time.Duration {
rtt := c.link.GetRTT()
if rtt < 0.025 {
rtt = 0.025
}
timeout := math.Pow(1.5, float64(tries-1)) * rtt * 2.5 * float64(len(c.txRing)+2)
return time.Duration(timeout * float64(time.Second))
}
func (c *Channel) AddMessageHandler(handler func(MessageBase) bool) {
c.mutex.Lock()
defer c.mutex.Unlock()
c.messageHandlers = append(c.messageHandlers, handler)
}
func (c *Channel) RemoveMessageHandler(handler func(MessageBase) bool) {
c.mutex.Lock()
defer c.mutex.Unlock()
for i, h := range c.messageHandlers {
if &h == &handler {
c.messageHandlers = append(c.messageHandlers[:i], c.messageHandlers[i+1:]...)
break
}
}
}
func (c *Channel) updateRateThresholds() {
rtt := c.link.RTT()
if rtt > RTTFast {
c.fastRateRounds = 0
if rtt > RTTMedium {
c.medRateRounds = 0
} else {
c.medRateRounds++
if c.windowMax < WindowMaxMedium && c.medRateRounds == FastRateThreshold {
c.windowMax = WindowMaxMedium
c.windowMin = WindowMinMedium
}
}
} else {
c.fastRateRounds++
if c.windowMax < WindowMaxFast && c.fastRateRounds == FastRateThreshold {
c.windowMax = WindowMaxFast
c.windowMin = WindowMinFast
}
}
}
func (c *Channel) Close() error {
c.mutex.Lock()
defer c.mutex.Unlock()
// Cleanup resources
return nil
}

View File

@@ -50,6 +50,12 @@ const (
SINGLE = 0x00
GROUP = 0x01
PLAIN = 0x02
// Link status constants
STATUS_NEW = 0
STATUS_ACTIVE = 1
STATUS_CLOSED = 2
STATUS_FAILED = 3
)
type PathInfo struct {
@@ -151,6 +157,7 @@ type Link struct {
physicalStats bool
staleTime time.Duration
staleGrace time.Duration
status int
}
type Destination struct {
@@ -218,9 +225,11 @@ func (l *Link) Teardown() {
}
}
func (l *Link) Send(data []byte) error {
func (l *Link) Send(data []byte) interface{} {
l.mutex.Lock()
l.lastOutbound = time.Now()
l.lastData = time.Now()
l.mutex.Unlock()
packet := &LinkPacket{
Destination: l.destination,
@@ -232,7 +241,12 @@ func (l *Link) Send(data []byte) error {
l.rtt = l.InactiveFor()
}
return packet.send()
err := packet.send()
if err != nil {
return nil
}
return packet
}
type AnnounceHandler interface {
@@ -795,3 +809,59 @@ func (t *Transport) Start() error {
defer t.mutex.Unlock()
return nil
}
// LinkInterface defines the methods required by Channel
type LinkInterface interface {
GetStatus() int
GetRTT() float64
RTT() float64
Send(data []byte) interface{}
Resend(packet interface{}) error
SetPacketTimeout(packet interface{}, callback func(interface{}), timeout time.Duration)
SetPacketDelivered(packet interface{}, callback func(interface{}))
}
func (l *Link) GetRTT() float64 {
l.mutex.RLock()
defer l.mutex.RUnlock()
return l.rtt.Seconds()
}
func (l *Link) RTT() float64 {
return l.GetRTT()
}
func (l *Link) Resend(packet interface{}) error {
if p, ok := packet.(*LinkPacket); ok {
p.Timestamp = time.Now()
return p.send()
}
return errors.New("invalid packet type")
}
func (l *Link) SetPacketTimeout(packet interface{}, callback func(interface{}), timeout time.Duration) {
if p, ok := packet.(*LinkPacket); ok {
// Start timeout timer
time.AfterFunc(timeout, func() {
callback(p)
})
}
}
func (l *Link) SetPacketDelivered(packet interface{}, callback func(interface{})) {
if p, ok := packet.(*LinkPacket); ok {
// Update RTT
l.mutex.Lock()
l.rtt = time.Since(p.Timestamp)
l.mutex.Unlock()
// Call delivery callback
callback(p)
}
}
func (l *Link) GetStatus() int {
l.mutex.RLock()
defer l.mutex.RUnlock()
return l.status
}