Files
Reticulum-Go/pkg/buffer/buffer.go
Sudo-Ivan 1d3a969742
Some checks failed
Bearer / scan (push) Successful in 9s
Go Build Multi-Platform / build (amd64, linux) (push) Successful in 42s
Go Build Multi-Platform / build (amd64, darwin) (push) Successful in 44s
Go Build Multi-Platform / build (arm, freebsd) (push) Successful in 41s
Go Build Multi-Platform / build (arm, windows) (push) Successful in 39s
Go Build Multi-Platform / build (arm64, windows) (push) Successful in 1m8s
Go Build Multi-Platform / build (wasm, js) (push) Successful in 1m6s
TinyGo Build / tinygo-build (tinygo-wasm, tinygo-wasm, reticulum-go.wasm, wasm) (pull_request) Failing after 1m2s
TinyGo Build / tinygo-build (tinygo-build, tinygo-default, reticulum-go-tinygo, ) (pull_request) Failing after 1m4s
Go Revive Lint / lint (push) Successful in 1m4s
Go Test Multi-Platform / Test (ubuntu-latest, arm64) (push) Successful in 1m24s
Run Gosec / tests (push) Successful in 1m29s
Go Test Multi-Platform / Test (ubuntu-latest, amd64) (push) Successful in 2m31s
Go Build Multi-Platform / build (amd64, freebsd) (push) Successful in 9m28s
Go Build Multi-Platform / build (arm, linux) (push) Successful in 9m28s
Go Build Multi-Platform / build (amd64, windows) (push) Successful in 9m30s
Go Build Multi-Platform / build (arm64, darwin) (push) Successful in 9m27s
Go Build Multi-Platform / build (arm64, linux) (push) Successful in 9m26s
Go Build Multi-Platform / build (arm64, freebsd) (push) Successful in 9m29s
Go Build Multi-Platform / Create Release (push) Has been skipped
chore: add SPDX license identifier and copyright notice
2025-12-31 20:44:58 -06:00

267 lines
5.9 KiB
Go

// SPDX-License-Identifier: 0BSD
// Copyright (c) 2024-2026 Sudo-Ivan / Quad4.io
package buffer
import (
"bufio"
"bytes"
"compress/bzip2"
"encoding/binary"
"io"
"sync"
"git.quad4.io/Networks/Reticulum-Go/pkg/channel"
)
const (
StreamIDMax = 0x3fff // 16383
MaxChunkLen = 16 * 1024
MaxDataLen = 457 // MDU - 2 - 6 (2 for stream header, 6 for channel envelope)
CompressTries = 4
// Stream header flags
StreamHeaderEOF = 0x8000
StreamHeaderCompressed = 0x4000
// Message type
StreamDataMessageType = 0x01
// Header size
StreamHeaderSize = 2
// Compression threshold
CompressThreshold = 32
)
type StreamDataMessage struct {
StreamID uint16
Data []byte
EOF bool
Compressed bool
}
func (m *StreamDataMessage) Pack() ([]byte, error) {
headerVal := uint16(m.StreamID & StreamIDMax)
if m.EOF {
headerVal |= StreamHeaderEOF
}
if m.Compressed {
headerVal |= StreamHeaderCompressed
}
buf := new(bytes.Buffer)
if err := binary.Write(buf, binary.BigEndian, headerVal); err != nil { // #nosec G104
return nil, err // Or handle the error appropriately
}
buf.Write(m.Data)
return buf.Bytes(), nil
}
func (m *StreamDataMessage) GetType() uint16 {
return StreamDataMessageType
}
func (m *StreamDataMessage) Unpack(data []byte) error {
if len(data) < StreamHeaderSize {
return io.ErrShortBuffer
}
header := binary.BigEndian.Uint16(data[:StreamHeaderSize])
m.StreamID = header & StreamIDMax
m.EOF = (header & StreamHeaderEOF) != 0
m.Compressed = (header & StreamHeaderCompressed) != 0
m.Data = data[StreamHeaderSize:]
return nil
}
type RawChannelReader struct {
streamID int
channel *channel.Channel
buffer *bytes.Buffer
eof bool
callbacks map[int]func(int)
nextCallbackID int
messageHandlerID int
mutex sync.RWMutex
}
func NewRawChannelReader(streamID int, ch *channel.Channel) *RawChannelReader {
reader := &RawChannelReader{
streamID: streamID,
channel: ch,
buffer: bytes.NewBuffer(nil),
callbacks: make(map[int]func(int)),
}
reader.messageHandlerID = ch.AddMessageHandler(reader.HandleMessage)
return reader
}
func (r *RawChannelReader) AddReadyCallback(cb func(int)) int {
r.mutex.Lock()
defer r.mutex.Unlock()
id := r.nextCallbackID
r.nextCallbackID++
r.callbacks[id] = cb
return id
}
func (r *RawChannelReader) RemoveReadyCallback(id int) {
r.mutex.Lock()
defer r.mutex.Unlock()
delete(r.callbacks, id)
}
func (r *RawChannelReader) Read(p []byte) (n int, err error) {
r.mutex.Lock()
defer r.mutex.Unlock()
if r.buffer.Len() == 0 && r.eof {
return 0, io.EOF
}
n, err = r.buffer.Read(p)
if err == io.EOF && !r.eof {
err = nil
}
return n, err
}
func (r *RawChannelReader) HandleMessage(msg channel.MessageBase) bool { // #nosec G115
if streamMsg, ok := msg.(*StreamDataMessage); ok && streamMsg.StreamID == uint16(r.streamID) {
r.mutex.Lock()
defer r.mutex.Unlock()
if streamMsg.Compressed {
decompressed := decompressData(streamMsg.Data)
r.buffer.Write(decompressed)
} else {
r.buffer.Write(streamMsg.Data)
}
if streamMsg.EOF {
r.eof = true
}
// Notify callbacks
for _, cb := range r.callbacks {
cb(r.buffer.Len())
}
return true
}
return false
}
type RawChannelWriter struct {
streamID int
channel *channel.Channel
eof bool
}
func NewRawChannelWriter(streamID int, ch *channel.Channel) *RawChannelWriter {
return &RawChannelWriter{
streamID: streamID,
channel: ch,
}
}
func (w *RawChannelWriter) Write(p []byte) (n int, err error) {
if len(p) > MaxChunkLen {
p = p[:MaxChunkLen]
}
msg := &StreamDataMessage{
StreamID: uint16(w.streamID), // #nosec G115
Data: p,
EOF: w.eof,
}
if len(p) > CompressThreshold {
for try := 1; try < CompressTries; try++ {
chunkLen := len(p) / try
compressed := compressData(p[:chunkLen])
if len(compressed) < MaxDataLen && len(compressed) < chunkLen {
msg.Data = compressed
msg.Compressed = true
break
}
}
}
if err := w.channel.Send(msg); err != nil {
return 0, err
}
return len(p), nil
}
func (w *RawChannelWriter) Close() error {
w.eof = true
_, err := w.Write(nil)
return err
}
type Buffer struct {
ReadWriter *bufio.ReadWriter
}
func (b *Buffer) Write(p []byte) (n int, err error) {
return b.ReadWriter.Write(p)
}
func (b *Buffer) Read(p []byte) (n int, err error) {
return b.ReadWriter.Read(p)
}
func (b *Buffer) Close() error {
return b.ReadWriter.Writer.Flush()
}
func CreateReader(streamID int, ch *channel.Channel, readyCallback func(int)) *bufio.Reader {
raw := NewRawChannelReader(streamID, ch)
if readyCallback != nil {
raw.AddReadyCallback(readyCallback)
}
return bufio.NewReader(raw)
}
func CreateWriter(streamID int, ch *channel.Channel) *bufio.Writer {
raw := NewRawChannelWriter(streamID, ch)
return bufio.NewWriter(raw)
}
func CreateBidirectionalBuffer(receiveStreamID, sendStreamID int, ch *channel.Channel, readyCallback func(int)) *bufio.ReadWriter {
reader := CreateReader(receiveStreamID, ch, readyCallback)
writer := CreateWriter(sendStreamID, ch)
return bufio.NewReadWriter(reader, writer)
}
func compressData(data []byte) []byte {
var compressed bytes.Buffer
w := bytes.NewBuffer(data)
r := bzip2.NewReader(w)
// bearer:disable go_gosec_filesystem_decompression_bomb
_, err := io.Copy(&compressed, r) // #nosec G104 #nosec G110
if err != nil {
// Handle error, e.g., log it or return an error
return nil
}
return compressed.Bytes()
}
func decompressData(data []byte) []byte {
reader := bzip2.NewReader(bytes.NewReader(data))
var decompressed bytes.Buffer
// Limit the amount of data read to prevent decompression bombs
limitedReader := io.LimitReader(reader, MaxChunkLen) // #nosec G110
// bearer:disable go_gosec_filesystem_decompression_bomb
_, err := io.Copy(&decompressed, limitedReader)
if err != nil {
// Handle error, e.g., log it or return an error
return nil
}
return decompressed.Bytes()
}