-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
* github.com/gaukas/[email protected] => github.com/refraction-networking/water/internal/wazerofs * github.com/gaukas/[email protected] => github.com/refraction-networking/water/internal/io Signed-off-by: Gaukas Wang <[email protected]>
- Loading branch information
Showing
27 changed files
with
888 additions
and
514 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,12 +1,10 @@ | ||
github.com/blang/vfs v1.0.0 h1:AUZUgulCDzbaNjTRWEP45X7m/J10brAptZpSRKRZBZc= | ||
github.com/blang/vfs v1.0.0/go.mod h1:jjuNUc/IKcRNNWC9NUCvz4fR9PZLPIKxEygtPs/4tSI= | ||
github.com/gaukas/wazerofs v0.1.0 h1:wIkW1bAxSnpaaVkQ5LOb1tm1BXdVap3eKjJpVWIqt2E= | ||
github.com/gaukas/wazerofs v0.1.0/go.mod h1:+JECB9Fwt0taPqSgHckG9lmT3tcoVK+9VJozTsq9UlI= | ||
github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU= | ||
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= | ||
github.com/refraction-networking/wazero v1.7.3-w h1:Br3UuVPrKAD3pUSIlpT1+iBIYMbs8h2wS4d0ziU9Yoc= | ||
github.com/refraction-networking/wazero v1.7.3-w/go.mod h1:ytl6Zuh20R/eROuyDaGPkp82O9C/DJfXAwJfQ3X6/7Y= | ||
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= | ||
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= | ||
google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI= | ||
google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= | ||
google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg= | ||
google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw= |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,29 @@ | ||
BSD 3-Clause License | ||
|
||
Copyright (c) 2018, the respective contributors, as shown by the AUTHORS file. | ||
All rights reserved. | ||
|
||
Redistribution and use in source and binary forms, with or without | ||
modification, are permitted provided that the following conditions are met: | ||
|
||
* Redistributions of source code must retain the above copyright notice, this | ||
list of conditions and the following disclaimer. | ||
|
||
* Redistributions in binary form must reproduce the above copyright notice, | ||
this list of conditions and the following disclaimer in the documentation | ||
and/or other materials provided with the distribution. | ||
|
||
* Neither the name of the copyright holder nor the names of its | ||
contributors may be used to endorse or promote products derived from | ||
this software without specific prior written permission. | ||
|
||
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" | ||
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE | ||
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE | ||
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE | ||
FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL | ||
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR | ||
SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER | ||
CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, | ||
OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE | ||
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
# io: Easy to use I/O object and function implementations | ||
|
||
Copied from `github.com/gaukas/[email protected]`. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
## package `io/conn` | ||
|
||
This package provides abstractions for connections build on top of other types. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,324 @@ | ||
package conn | ||
|
||
import ( | ||
"context" | ||
"errors" | ||
"io" | ||
"net" | ||
"os" | ||
"runtime" | ||
"sync" | ||
"sync/atomic" | ||
"syscall" | ||
"time" | ||
) | ||
|
||
type ChannelConn struct { | ||
chanRX <-chan []byte // read from this channel, owned by the writing-side to this channel | ||
chanTX chan<- []byte // write to this channel, owned by this struct | ||
|
||
chanClose chan struct{} // notify close event to unblock blocking read/write operations | ||
closed atomic.Bool // true if the channel is closed | ||
|
||
readBuf []byte // protected by readBufMutex, accessed only from readLocked and readLockedFromBuffer | ||
readBufMutex sync.Mutex | ||
|
||
pendingWrite atomic.Bool // indicates if there is an outstanding writer blocking, protecting TX channel | ||
|
||
nonblocking atomic.Bool | ||
} | ||
|
||
func NewChannelConn(rx <-chan []byte, tx chan<- []byte) *ChannelConn { | ||
return &ChannelConn{ | ||
chanRX: rx, | ||
chanTX: tx, | ||
chanClose: make(chan struct{}), | ||
} | ||
} | ||
|
||
// ChannelConn implements [Conn]. | ||
var _ Conn = (*ChannelConn)(nil) | ||
|
||
// Read reads data from the channel. Implements [net.Conn]. | ||
func (c *ChannelConn) Read(b []byte) (n int, err error) { | ||
if c.closed.Load() { | ||
return 0, io.ErrClosedPipe | ||
} | ||
|
||
c.readBufMutex.Lock() | ||
defer c.readBufMutex.Unlock() | ||
|
||
return c.readLocked(b) | ||
} | ||
|
||
// read blocks until some data is available, or the channel is closed. | ||
func (c *ChannelConn) readLocked(b []byte) (int, error) { | ||
if len(c.readBuf) != 0 { // need to resume reading from the buffer | ||
return c.readLockedFromBuffer(b) | ||
} | ||
|
||
if c.nonblocking.Load() { | ||
for { | ||
select { | ||
case <-c.chanClose: | ||
return 0, io.ErrClosedPipe | ||
case c.readBuf = <-c.chanRX: | ||
if len(c.readBuf) != 0 { // buffer is empty, read from the channel OK | ||
return c.readLockedFromBuffer(b) | ||
} else { // empty read from channel | ||
if c.readBuf == nil { // closed channel | ||
return 0, io.EOF | ||
} else { // channel open, but empty read: other end testing if write will block | ||
continue | ||
} | ||
} | ||
default: | ||
return 0, syscall.EAGAIN | ||
} | ||
} | ||
} else { | ||
for { | ||
select { | ||
case <-c.chanClose: | ||
return 0, io.ErrClosedPipe | ||
case c.readBuf = <-c.chanRX: | ||
if len(c.readBuf) != 0 { // buffer is empty, read from the channel OK | ||
return c.readLockedFromBuffer(b) | ||
} else { // empty read from channel | ||
if c.readBuf == nil { // closed channel | ||
return 0, io.EOF | ||
} else { // channel open, but empty read: other end testing if write will block | ||
continue | ||
} | ||
} | ||
} | ||
} | ||
} | ||
} | ||
|
||
// readLockedFromBuffer reads from the buffer. Assumes the buffer is non-empty. | ||
func (c *ChannelConn) readLockedFromBuffer(b []byte) (n int, err error) { | ||
n = copy(b, c.readBuf) | ||
c.readBuf = c.readBuf[n:] | ||
return | ||
} | ||
|
||
// Write writes data to the channel. Implements [net.Conn]. | ||
func (c *ChannelConn) Write(b []byte) (n int, err error) { | ||
if c.nonblocking.Load() { | ||
if c.pendingWrite.CompareAndSwap(false, true) { | ||
defer c.pendingWrite.Store(false) | ||
n, err = c.writeFlagAcquired(b) | ||
} else { | ||
return 0, syscall.EAGAIN | ||
} | ||
} else { | ||
// retry until acquired the pending write flag | ||
for !c.pendingWrite.CompareAndSwap(false, true) { | ||
runtime.Gosched() | ||
} | ||
defer c.pendingWrite.Store(false) | ||
n, err = c.writeFlagAcquired(b) | ||
} | ||
|
||
return | ||
} | ||
|
||
// writeFlagAcquired writes data to the channel. Caller must | ||
// acquire the pending write flag before calling this function. | ||
func (c *ChannelConn) writeFlagAcquired(b []byte) (n int, err error) { | ||
if c.closed.Load() { // check if the channel is closed only after acquiring the pending write flag to prevent racing condition | ||
return 0, io.ErrClosedPipe | ||
} | ||
|
||
expectedLen := len(b) | ||
|
||
bCopy := make([]byte, expectedLen) | ||
if copy(bCopy, b) != expectedLen { | ||
return 0, io.ErrUnexpectedEOF | ||
} | ||
|
||
if c.nonblocking.Load() { | ||
select { | ||
case <-c.chanClose: | ||
return 0, io.ErrClosedPipe | ||
case c.chanTX <- bCopy: | ||
return expectedLen, nil | ||
default: | ||
return 0, syscall.EAGAIN | ||
} | ||
} else { | ||
select { | ||
case <-c.chanClose: | ||
return 0, io.ErrClosedPipe | ||
case c.chanTX <- bCopy: | ||
return expectedLen, nil | ||
} | ||
} | ||
} | ||
|
||
func (c *ChannelConn) Close() error { | ||
if c.closed.CompareAndSwap(false, true) { | ||
close(c.chanClose) | ||
|
||
// acquire the pending write flag before closing the TX channel | ||
for !c.pendingWrite.CompareAndSwap(false, true) { | ||
runtime.Gosched() | ||
} | ||
close(c.chanTX) | ||
c.pendingWrite.Store(false) | ||
|
||
return nil | ||
} | ||
|
||
return io.ErrClosedPipe // double close | ||
} | ||
|
||
type channelAddr struct{} | ||
|
||
func (channelAddr) Network() string { return "channel" } | ||
func (channelAddr) String() string { return "channel" } | ||
|
||
// ChannelConn does not implement [NetworkConn]. | ||
var _ NetworkConn = (*ChannelConn)(nil) | ||
|
||
// LocalAddr returns the local network address. Implements [net.Conn]. | ||
func (*ChannelConn) LocalAddr() net.Addr { return channelAddr{} } | ||
|
||
// RemoteAddr returns the remote network address. Implements [net.Conn]. | ||
func (*ChannelConn) RemoteAddr() net.Addr { return channelAddr{} } | ||
|
||
// ChannelConn does not implement [DeadlineConn]. However, fake implementation | ||
// is provided such that it can be used as [net.Conn] in some cases when | ||
// deadlines are not used. | ||
// | ||
// TODO: properly implement [DeadlineConn]. | ||
var _ DeadlineConn = (*ChannelConn)(nil) | ||
|
||
// SetDeadline is not supported by ChannelConn. It will always return | ||
// [os.ErrNoDeadline]. | ||
// | ||
// TODO: properly implement the support for deadlines. | ||
func (*ChannelConn) SetDeadline(time.Time) error { | ||
return os.ErrNoDeadline | ||
} | ||
|
||
// SetReadDeadline is not supported by ChannelConn. It will always return | ||
// [os.ErrNoDeadline]. | ||
// | ||
// TODO: properly implement the support for read deadline. | ||
func (*ChannelConn) SetReadDeadline(time.Time) error { | ||
return os.ErrNoDeadline | ||
} | ||
|
||
// SetWriteDeadline is not supported by ChannelConn. It will always return | ||
// [os.ErrNoDeadline]. | ||
// | ||
// TODO: properly implement the support for write deadline. | ||
func (*ChannelConn) SetWriteDeadline(time.Time) error { | ||
return os.ErrNoDeadline | ||
} | ||
|
||
// ChannelConn implements [NonblockingConn]. | ||
var _ NonblockingConn = (*ChannelConn)(nil) | ||
|
||
// IsNonblock returns true if the connection is in non-blocking mode. | ||
func (c *ChannelConn) IsNonblock() bool { | ||
return c.nonblocking.Load() | ||
} | ||
|
||
// SetNonblock updates the non-blocking mode of the connection if applicable. | ||
func (c *ChannelConn) SetNonblock(nonblocking bool) (ok bool) { | ||
c.nonblocking.Store(nonblocking) | ||
return true | ||
} | ||
|
||
// ChannelConn implements [PollConn]. | ||
var _ PollConn = (*ChannelConn)(nil) | ||
|
||
func (c *ChannelConn) PollR(ctx context.Context) (bool, error) { | ||
if !c.nonblocking.Load() { | ||
return false, errors.New("polling is not supported in blocking mode") | ||
} | ||
|
||
if c.closed.Load() { | ||
return false, io.EOF | ||
} | ||
|
||
for !c.readBufMutex.TryLock() && ctx.Err() == nil { | ||
runtime.Gosched() | ||
} | ||
|
||
if ctx.Err() != nil { | ||
return false, ctx.Err() | ||
} | ||
|
||
defer c.readBufMutex.Unlock() | ||
|
||
if len(c.readBuf) != 0 { | ||
return true, nil | ||
} | ||
|
||
// We cannot check cap(c.chanRX) vs. len(c.chanRX) here because it is | ||
// possible that messages in the buffer being empty probes sent by the | ||
// other end to check if the write will block. Instead the universal | ||
// reading strategy below is used. | ||
|
||
for { | ||
select { | ||
case <-c.chanClose: | ||
return false, io.EOF | ||
case c.readBuf = <-c.chanRX: | ||
if len(c.readBuf) != 0 { | ||
return true, nil | ||
} else { | ||
if c.readBuf == nil { | ||
return false, io.EOF | ||
} else { | ||
continue | ||
} | ||
} | ||
case <-ctx.Done(): | ||
return false, ctx.Err() | ||
} | ||
} | ||
} | ||
|
||
func (c *ChannelConn) PollW(ctx context.Context) (bool, error) { | ||
if !c.nonblocking.Load() { | ||
return false, errors.New("polling is not supported in blocking mode") | ||
} | ||
|
||
// aquire the pending write flag before writing to the TX channel | ||
for !c.pendingWrite.CompareAndSwap(false, true) && ctx.Err() == nil { | ||
runtime.Gosched() | ||
} | ||
|
||
if ctx.Err() != nil { | ||
return false, ctx.Err() | ||
} | ||
|
||
defer c.pendingWrite.Store(false) | ||
|
||
if c.closed.Load() { | ||
return false, io.EOF | ||
} | ||
|
||
// Buffered channel: | ||
if cap(c.chanTX) > 0 { | ||
for ctx.Err() == nil && len(c.chanTX) >= cap(c.chanTX) { | ||
runtime.Gosched() | ||
} | ||
return len(c.chanTX) < cap(c.chanTX), ctx.Err() | ||
} | ||
|
||
// Unbuffered channel: | ||
select { | ||
case <-c.chanClose: | ||
return false, io.EOF | ||
case c.chanTX <- []byte{}: | ||
return true, nil | ||
case <-ctx.Done(): | ||
return false, ctx.Err() | ||
} | ||
} |
Oops, something went wrong.