diff --git a/.github/workflows/benchmark.yml b/.github/workflows/benchmark.yml index 2d9c18c..9a50fcf 100644 --- a/.github/workflows/benchmark.yml +++ b/.github/workflows/benchmark.yml @@ -9,52 +9,93 @@ on: jobs: benchmark: - runs-on: ubuntu-latest + strategy: + matrix: + go-version: ['1.24'] + os: [ubuntu-latest] + + runs-on: ${{ matrix.os }} + steps: - uses: actions/checkout@v6 - name: Set up Go uses: actions/setup-go@v5 with: - go-version: '1.21' + go-version: ${{ matrix.go-version }} - name: Install dependencies run: | sudo apt-get update sudo apt-get install -y redis-tools netcat-openbsd - - name: Run benchmark + - name: Build server run: | - # Start server in background cd example/memory_kv - # Start server with logging - go run server.go > server.log 2>&1 & + go build -o server + + - name: Start server + run: | + cd example/memory_kv + ./server -addr "127.0.0.1:6380" > server.log 2>&1 & SERVER_PID=$! - - # Wait for server to be ready (longer timeout for CI environment) + echo $SERVER_PID > server.pid + for i in {1..30}; do if nc -z 127.0.0.1 6380; then echo "Server is ready" break fi - sleep 2 + sleep 1 echo "Waiting for server to start... ($i/30)" done - - # Verify server is ready + if ! nc -z 127.0.0.1 6380; then - echo "Server failed to start after 60 seconds" - echo "Server logs:" - cat server.log || true + echo "Server failed to start" + cat server.log exit 1 fi - - # Run benchmark tests - echo "Running benchmark tests..." - redis-benchmark -h 127.0.0.1 -p 6380 -n 5000000 -t set,get -c 512 -P 1024 -q - - # Stop server - kill $SERVER_PID - - # Output results - echo "Benchmark completed" + + - name: Warm up + run: | + redis-benchmark -h 127.0.0.1 -p 6380 -n 10000 -t set,get -c 16 -q + + - name: Run SET benchmark + run: | + echo "=== SET Benchmark ===" + redis-benchmark -h 127.0.0.1 -p 6380 -n 100000 -t set -c 16 + + - name: Run GET benchmark + run: | + echo "=== GET Benchmark ===" + redis-benchmark -h 127.0.0.1 -p 6380 -n 100000 -t get -c 16 + + - name: Run mixed SET/GET benchmark + run: | + echo "=== Mixed SET/GET Benchmark ===" + redis-benchmark -h 127.0.0.1 -p 6380 -n 100000 -t set,get -c 16 + + - name: Run high concurrency benchmark + run: | + echo "=== High Concurrency Benchmark ===" + redis-benchmark -h 127.0.0.1 -p 6380 -n 100000 -t set,get -c 64 -P 16 + + - name: Run pipeline benchmark + run: | + echo "=== Pipeline Benchmark ===" + redis-benchmark -h 127.0.0.1 -p 6380 -n 100000 -t set,get -c 16 -P 32 + + - name: Stop server + if: always() + run: | + cd example/memory_kv + if [ -f server.pid ]; then + kill $(cat server.pid) || true + rm server.pid + fi + + - name: Show server logs + if: failure() + run: | + cat example/memory_kv/server.log || true + diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index ed3ade8..d86d1a8 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -5,39 +5,30 @@ on: branches: [ main ] pull_request: branches: [ main ] - + jobs: - # deploy: - # runs-on: ubuntu-20.04 - # concurrency: - # group: ${{ github.workflow }}-${{ github.ref }} - # steps: - # - uses: actions/checkout@v2 - # with: - # submodules: true # Fetch Hugo themes (true OR recursive) - # fetch-depth: 0 # Fetch all history for .GitInfo and .Lastmod - - # - run: mv README.md doc/index.md - - # - name: Deploy Pages - # uses: peaceiris/actions-gh-pages@v3 - # with: - # github_token: ${{ secrets.GITHUB_TOKEN }} - # publish_dir: ./doc - # enable_jekyll: true - build-example-kv: - runs-on: ubuntu-latest + strategy: + matrix: + go-version: ['1.21', '1.22', '1.23', '1.24'] + os: [ubuntu-latest] + + runs-on: ${{ matrix.os }} + steps: - uses: actions/checkout@v6 - name: Set up Go uses: actions/setup-go@v5 with: - go-version: 1.24.0 + go-version: ${{ matrix.go-version }} + + - name: Download dependencies + run: go mod download - name: Build run: | cd example/memory_kv/ - go build + go build -o server + diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml new file mode 100644 index 0000000..95d0a62 --- /dev/null +++ b/.github/workflows/test.yml @@ -0,0 +1,138 @@ +name: CI Tests + +on: + push: + branches: [ main, master ] + pull_request: + branches: [ main, master ] + +jobs: + build-and-test: + strategy: + matrix: + go-version: ['1.21', '1.22', '1.23', '1.24'] + os: [ubuntu-latest] + + runs-on: ${{ matrix.os }} + + steps: + - name: Checkout code + uses: actions/checkout@v6 + + - name: Set up Go + uses: actions/setup-go@v5 + with: + go-version: ${{ matrix.go-version }} + + - name: Install dependencies + run: | + sudo apt-get update + sudo apt-get install -y redis-tools netcat-openbsd + + - name: Download dependencies + run: go mod download + + - name: Verify dependencies + run: go mod verify + + - name: Build all packages + run: go build ./... + + - name: Build example + run: | + cd example/memory_kv + go build -o server + + - name: Run unit tests + run: go test -v -race -coverprofile=coverage.out ./... + + - name: Upload coverage to Codecov + uses: codecov/codecov-action@v4 + with: + file: ./coverage.out + fail_ci_if_error: false + + benchmark-test: + runs-on: ubuntu-latest + + steps: + - name: Checkout code + uses: actions/checkout@v6 + + - name: Set up Go + uses: actions/setup-go@v5 + with: + go-version: '1.24' + + - name: Install dependencies + run: | + sudo apt-get update + sudo apt-get install -y redis-tools netcat-openbsd + + - name: Build example server + run: | + cd example/memory_kv + go build -o server + + - name: Start server + run: | + cd example/memory_kv + ./server -addr "127.0.0.1:6380" > server.log 2>&1 & + SERVER_PID=$! + echo $SERVER_PID > server.pid + + # Wait for server to be ready + for i in {1..30}; do + if nc -z 127.0.0.1 6380; then + echo "Server is ready" + break + fi + sleep 1 + echo "Waiting for server to start... ($i/30)" + done + + # Verify server is ready + if ! nc -z 127.0.0.1 6380; then + echo "Server failed to start" + cat server.log + exit 1 + fi + + - name: Warm up server + run: | + echo "Warming up server..." + redis-benchmark -h 127.0.0.1 -p 6380 -n 10000 -t set,get -c 16 -q + + - name: Run SET benchmark + run: | + echo "Running SET benchmark..." + redis-benchmark -h 127.0.0.1 -p 6380 -n 100000 -t set -c 16 -q + + - name: Run GET benchmark + run: | + echo "Running GET benchmark..." + redis-benchmark -h 127.0.0.1 -p 6380 -n 100000 -t get -c 16 -q + + - name: Run mixed SET/GET benchmark + run: | + echo "Running mixed SET/GET benchmark..." + redis-benchmark -h 127.0.0.1 -p 6380 -n 100000 -t set,get -c 16 -q + + - name: Run concurrent benchmark + run: | + echo "Running concurrent benchmark..." + redis-benchmark -h 127.0.0.1 -p 6380 -n 100000 -t set,get -c 64 -q + + - name: Stop server + if: always() + run: | + cd example/memory_kv + if [ -f server.pid ]; then + kill $(cat server.pid) || true + rm server.pid + fi + + - name: Show server logs + if: failure() + run: | + cat example/memory_kv/server.log || true diff --git a/.gitignore b/.gitignore index 04d1287..b315aaa 100644 --- a/.gitignore +++ b/.gitignore @@ -26,3 +26,4 @@ vendor #vscode .vscode example/memory_kv/memory_kv +example/memory_kv/server diff --git a/go.mod b/go.mod index c37f8a4..2a4802e 100644 --- a/go.mod +++ b/go.mod @@ -3,18 +3,21 @@ module github.com/IceFireDB/redhub go 1.24.0 require ( - github.com/panjf2000/gnet v1.6.7 + github.com/panjf2000/gnet/v2 v2.9.7 github.com/stretchr/testify v1.11.1 ) require ( github.com/davecgh/go-spew v1.1.1 // indirect - github.com/panjf2000/ants/v2 v2.9.0 // indirect + github.com/kr/pretty v0.1.0 // indirect + github.com/panjf2000/ants/v2 v2.11.4 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/valyala/bytebufferpool v1.0.0 // indirect go.uber.org/multierr v1.11.0 // indirect - go.uber.org/zap v1.27.0 // indirect - golang.org/x/sys v0.26.0 // indirect + go.uber.org/zap v1.27.1 // indirect + golang.org/x/sync v0.19.0 // indirect + golang.org/x/sys v0.40.0 // indirect + gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 // indirect gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 01425c0..5f77694 100644 --- a/go.sum +++ b/go.sum @@ -1,6 +1,3 @@ -github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= -github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= -github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= @@ -9,79 +6,30 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= -github.com/panjf2000/ants/v2 v2.4.7/go.mod h1:f6F0NZVFsGCp5A7QW/Zj/m92atWwOkY0OIhFxRNFr4A= -github.com/panjf2000/ants/v2 v2.9.0 h1:SztCLkVxBRigbg+vt0S5QvF5vxAbxbKt09/YfAJ0tEo= -github.com/panjf2000/ants/v2 v2.9.0/go.mod h1:7ZxyxsqE4vvW0M7LSD8aI3cKwgFhBHbxnlN8mDqHa1I= -github.com/panjf2000/gnet v1.6.7 h1:zv1k6kw80sG5ZQrLpbbFDheNCm50zm3z2e3ck5GwMOM= -github.com/panjf2000/gnet v1.6.7/go.mod h1:KcOU7QsCaCBjeD5kyshBIamG3d9kAQtlob4Y0v0E+sc= -github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/panjf2000/ants/v2 v2.11.4 h1:UJQbtN1jIcI5CYNocTj0fuAUYvsLjPoYi0YuhqV/Y48= +github.com/panjf2000/ants/v2 v2.11.4/go.mod h1:8u92CYMUc6gyvTIw8Ru7Mt7+/ESnJahz5EVtqfrilek= +github.com/panjf2000/gnet/v2 v2.9.7 h1:6zW7Jl3oAfXwSuh1PxHLndoL2MQRWx0AJR6aaQjxUgA= +github.com/panjf2000/gnet/v2 v2.9.7/go.mod h1:WQTxDWYuQ/hz3eccH0FN32IVuvZ19HewEWx0l62fx7E= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= -github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= -github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= -github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= -github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= -github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= -github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= -github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= -go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= -go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= -go.uber.org/goleak v1.1.11-0.20210813005559-691160354723/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= -go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU= -go.uber.org/multierr v1.7.0/go.mod h1:7EAYxJLBy9rStEaz58O2t4Uvip6FSURkq8/ppBp95ak= go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= -go.uber.org/zap v1.19.1/go.mod h1:j3DNczoxDZroyBnOT1L/Q79cfUMGZxlv/9dzN7SM1rI= -go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8= -go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= -golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= -golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= -golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= -golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= -golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= -golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= -golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= -golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= -golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y= -golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= -golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20211204120058-94396e421777/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.26.0 h1:KHjCJyddX0LoSTb3J+vWpupP9p0oznkqVk/IfjymZbo= -golang.org/x/sys v0.26.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= -golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= -golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= -golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= -golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= -golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= -golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +go.uber.org/zap v1.27.1 h1:08RqriUEv8+ArZRYSTXy1LeBScaMpVSTBhCeaZYfMYc= +go.uber.org/zap v1.27.1/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= +golang.org/x/sync v0.19.0 h1:vV+1eWNmZ5geRlYjzm2adRgW2/mcpevXNg50YZtPCE4= +golang.org/x/sync v0.19.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= +golang.org/x/sys v0.40.0 h1:DBZZqJ2Rkml6QMQsZywtnjnnGvHza6BTfYFWY9kjEWQ= +golang.org/x/sys v0.40.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/natefinch/lumberjack.v2 v2.0.0/go.mod h1:l0ndWWf7gzL7RNwBG7wST/UCcT4T24xpD6X8LsfU/+k= gopkg.in/natefinch/lumberjack.v2 v2.2.1 h1:bBRl1b0OH9s/DuPhuXpNl+VtCaJXFZ5/uEFST95x9zc= gopkg.in/natefinch/lumberjack.v2 v2.2.1/go.mod h1:YD8tP3GAjkrDg1eZH7EGmyESg/lsYskCTPBJVb9jqSc= -gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= -gopkg.in/yaml.v2 v2.2.7/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= -gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= -gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= -gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/pkg/resp/comparse_test.go b/pkg/resp/comparse_test.go new file mode 100644 index 0000000..a2cecec --- /dev/null +++ b/pkg/resp/comparse_test.go @@ -0,0 +1,39 @@ +package resp + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestWriteArray(t *testing.T) { + w := &Writer{} + w.WriteArray(3) + assert.Equal(t, []byte("*3\r\n"), w.b) +} + +func TestWriteBulk(t *testing.T) { + w := &Writer{} + w.WriteBulk([]byte("hello")) + assert.Equal(t, []byte("$5\r\nhello\r\n"), w.b) +} + +func TestWriteMultipleBulk(t *testing.T) { + w := &Writer{} + w.WriteArray(2) + w.WriteBulk([]byte("key")) + w.WriteBulk([]byte("value")) + assert.Equal(t, []byte("*2\r\n$3\r\nkey\r\n$5\r\nvalue\r\n"), w.b) +} + +func TestWriteBulkEmpty(t *testing.T) { + w := &Writer{} + w.WriteBulk([]byte{}) + assert.Equal(t, []byte("$0\r\n\r\n"), w.b) +} + +func TestWriteBulkSpecialChars(t *testing.T) { + w := &Writer{} + w.WriteBulk([]byte("hello\r\nworld")) + assert.Equal(t, []byte("$12\r\nhello\r\nworld\r\n"), w.b) +} diff --git a/pkg/resp/resp_test.go b/pkg/resp/resp_test.go new file mode 100644 index 0000000..32ee433 --- /dev/null +++ b/pkg/resp/resp_test.go @@ -0,0 +1,496 @@ +package resp + +import ( + "errors" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestAppendUint(t *testing.T) { + tests := []struct { + name string + input uint64 + expected []byte + }{ + {"zero", 0, []byte(":0\r\n")}, + {"small", 123, []byte(":123\r\n")}, + {"large", 9223372036854775808, []byte(":9223372036854775808\r\n")}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := AppendUint(nil, tt.input) + assert.Equal(t, tt.expected, result) + }) + } +} + +func TestAppendInt(t *testing.T) { + tests := []struct { + name string + input int64 + expected []byte + }{ + {"zero", 0, []byte(":0\r\n")}, + {"positive", 123, []byte(":123\r\n")}, + {"negative", -456, []byte(":-456\r\n")}, + {"min", -9223372036854775808, []byte(":-9223372036854775808\r\n")}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := AppendInt(nil, tt.input) + assert.Equal(t, tt.expected, result) + }) + } +} + +func TestAppendArray(t *testing.T) { + tests := []struct { + name string + input int + expected []byte + }{ + {"zero", 0, []byte("*0\r\n")}, + {"small", 1, []byte("*1\r\n")}, + {"large", 1000, []byte("*1000\r\n")}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := AppendArray(nil, tt.input) + assert.Equal(t, tt.expected, result) + }) + } +} + +func TestAppendBulk(t *testing.T) { + tests := []struct { + name string + input []byte + expected []byte + }{ + {"empty", []byte{}, []byte("$0\r\n\r\n")}, + {"simple", []byte("hello"), []byte("$5\r\nhello\r\n")}, + {"binary", []byte{0x00, 0x01, 0x02}, []byte("$3\r\n\x00\x01\x02\r\n")}, + {"with newline", []byte("hello\nworld"), []byte("$11\r\nhello\nworld\r\n")}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := AppendBulk(nil, tt.input) + assert.Equal(t, tt.expected, result) + }) + } +} + +func TestAppendBulkString(t *testing.T) { + tests := []struct { + name string + input string + expected []byte + }{ + {"empty", "", []byte("$0\r\n\r\n")}, + {"simple", "hello", []byte("$5\r\nhello\r\n")}, + {"unicode", "你好", []byte("$6\r\n你好\r\n")}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := AppendBulkString(nil, tt.input) + assert.Equal(t, tt.expected, result) + }) + } +} + +func TestAppendString(t *testing.T) { + tests := []struct { + name string + input string + expected []byte + }{ + {"ok", "OK", []byte("+OK\r\n")}, + {"pong", "PONG", []byte("+PONG\r\n")}, + {"message", "hello world", []byte("+hello world\r\n")}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := AppendString(nil, tt.input) + assert.Equal(t, tt.expected, result) + }) + } +} + +func TestAppendError(t *testing.T) { + tests := []struct { + name string + input string + expected []byte + }{ + {"simple", "some error", []byte("-some error\r\n")}, + {"protocol error", "Protocol error: invalid", []byte("-Protocol error: invalid\r\n")}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := AppendError(nil, tt.input) + assert.Equal(t, tt.expected, result) + }) + } +} + +func TestAppendOK(t *testing.T) { + result := AppendOK(nil) + assert.Equal(t, []byte("+OK\r\n"), result) +} + +func TestAppendNull(t *testing.T) { + result := AppendNull(nil) + assert.Equal(t, []byte("$-1\r\n"), result) +} + +func TestAppendBulkFloat(t *testing.T) { + tests := []struct { + name string + input float64 + expected []byte + }{ + {"zero", 0.0, []byte("$1\r\n0\r\n")}, + {"positive", 3.14, []byte("$4\r\n3.14\r\n")}, + {"negative", -2.5, []byte("$4\r\n-2.5\r\n")}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := AppendBulkFloat(nil, tt.input) + assert.Equal(t, tt.expected, result) + }) + } +} + +func TestAppendBulkInt(t *testing.T) { + tests := []struct { + name string + input int64 + expected []byte + }{ + {"zero", 0, []byte("$1\r\n0\r\n")}, + {"positive", 123, []byte("$3\r\n123\r\n")}, + {"negative", -456, []byte("$4\r\n-456\r\n")}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := AppendBulkInt(nil, tt.input) + assert.Equal(t, tt.expected, result) + }) + } +} + +func TestAppendBulkUint(t *testing.T) { + tests := []struct { + name string + input uint64 + expected []byte + }{ + {"zero", 0, []byte("$1\r\n0\r\n")}, + {"small", 123, []byte("$3\r\n123\r\n")}, + {"large", 18446744073709551615, []byte("$20\r\n18446744073709551615\r\n")}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := AppendBulkUint(nil, tt.input) + assert.Equal(t, tt.expected, result) + }) + } +} + +func TestPrefixERRIfNeeded(t *testing.T) { + tests := []struct { + name string + input string + expected string + }{ + {"already prefixed", "ERR something", "ERR something"}, + {"uppercase first", "WRONGTYPE Operation", "WRONGTYPE Operation"}, + {"lowercase first", "invalid command", "ERR invalid command"}, + {"mixed case", "someError", "ERR someError"}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := prefixERRIfNeeded(tt.input) + assert.Equal(t, tt.expected, result) + }) + } +} + +func TestAppendAnyNil(t *testing.T) { + result := AppendAny(nil, nil) + assert.Equal(t, []byte("$-1\r\n"), result) +} + +func TestAppendAnyError(t *testing.T) { + tests := []struct { + name string + input error + expected string + }{ + {"standard error", errors.New("test error"), "-ERR test error\r\n"}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := AppendAny(nil, tt.input) + assert.Equal(t, tt.expected, string(result)) + }) + } +} + +func TestAppendAnyString(t *testing.T) { + result := AppendAny(nil, "hello") + assert.Equal(t, []byte("$5\r\nhello\r\n"), result) +} + +func TestAppendAnyBool(t *testing.T) { + tests := []struct { + name string + input bool + expected []byte + }{ + {"true", true, []byte("$1\r\n1\r\n")}, + {"false", false, []byte("$1\r\n0\r\n")}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := AppendAny(nil, tt.input) + assert.Equal(t, tt.expected, result) + }) + } +} + +func TestAppendAnyInt(t *testing.T) { + tests := []struct { + name string + input interface{} + expected []byte + }{ + {"int", int(123), []byte("$3\r\n123\r\n")}, + {"int8", int8(12), []byte("$2\r\n12\r\n")}, + {"int16", int16(3456), []byte("$4\r\n3456\r\n")}, + {"int32", int32(789012), []byte("$6\r\n789012\r\n")}, + {"int64", int64(1234567890), []byte("$10\r\n1234567890\r\n")}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := AppendAny(nil, tt.input) + assert.Equal(t, tt.expected, result) + }) + } +} + +func TestAppendAnyUint(t *testing.T) { + tests := []struct { + name string + input interface{} + expected []byte + }{ + {"uint", uint(123), []byte("$3\r\n123\r\n")}, + {"uint8", uint8(255), []byte("$3\r\n255\r\n")}, + {"uint16", uint16(65535), []byte("$5\r\n65535\r\n")}, + {"uint32", uint32(4294967295), []byte("$10\r\n4294967295\r\n")}, + {"uint64", uint64(18446744073709551615), []byte("$20\r\n18446744073709551615\r\n")}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := AppendAny(nil, tt.input) + assert.Equal(t, tt.expected, result) + }) + } +} + +func TestAppendAnyFloat(t *testing.T) { + tests := []struct { + name string + input interface{} + prefix string + }{ + {"float32", float32(3.14), "$17\r\n"}, + {"float64", float64(6.28), "$4\r\n"}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := AppendAny(nil, tt.input) + assert.Contains(t, string(result), tt.prefix) + }) + } +} + +func TestAppendAnySlice(t *testing.T) { + result := AppendAny(nil, []string{"a", "b", "c"}) + expected := []byte("*3\r\n$1\r\na\r\n$1\r\nb\r\n$1\r\nc\r\n") + assert.Equal(t, expected, result) +} + +func TestAppendAnyMap(t *testing.T) { + result := AppendAny(nil, map[string]interface{}{"key": "value"}) + expected := []byte("*2\r\n$3\r\nkey\r\n$5\r\nvalue\r\n") + assert.Equal(t, expected, result) +} + +func TestAppendAnyMapMultiple(t *testing.T) { + result := AppendAny(nil, map[string]interface{}{"a": 1, "b": 2, "c": 3}) + assert.Equal(t, byte('*'), result[0]) +} + +func TestAppendAnySimpleString(t *testing.T) { + result := AppendAny(nil, SimpleString("PONG")) + assert.Equal(t, []byte("+PONG\r\n"), result) +} + +func TestAppendAnySimpleInt(t *testing.T) { + result := AppendAny(nil, SimpleInt(42)) + assert.Equal(t, []byte(":42\r\n"), result) +} + +func TestAppendAnyMarshaler(t *testing.T) { + type custom struct { + value string + } + c := custom{value: "CUSTOM"} + result := AppendAny(nil, MarshalerFunc(func() []byte { + return []byte("+" + c.value + "\r\n") + })) + assert.Equal(t, []byte("+CUSTOM\r\n"), result) +} + +type MarshalerFunc func() []byte + +func (f MarshalerFunc) MarshalRESP() []byte { + return f() +} + +func TestAppendTile38(t *testing.T) { + result := AppendTile38(nil, []byte("SET key value")) + assert.Equal(t, []byte("$13 SET key value\r\n"), result) +} + +func TestReadNextRESP_Integer(t *testing.T) { + tests := []struct { + name string + input []byte + expected RESP + consumed int + }{ + {"zero", []byte(":0\r\n"), RESP{Type: Integer, Data: []byte("0"), Raw: []byte(":0\r\n")}, 4}, + {"positive", []byte(":123\r\n"), RESP{Type: Integer, Data: []byte("123"), Raw: []byte(":123\r\n")}, 6}, + {"negative", []byte(":-456\r\n"), RESP{Type: Integer, Data: []byte("-456"), Raw: []byte(":-456\r\n")}, 7}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + n, resp := ReadNextRESP(tt.input) + assert.Equal(t, tt.consumed, n) + assert.Equal(t, tt.expected.Type, resp.Type) + assert.Equal(t, tt.expected.Data, resp.Data) + }) + } +} + +func TestReadNextRESP_String(t *testing.T) { + input := []byte("+OK\r\n") + n, resp := ReadNextRESP(input) + assert.Equal(t, 5, n) + assert.Equal(t, Type('+'), resp.Type) + assert.Equal(t, []byte("OK"), resp.Data) +} + +func TestReadNextRESP_Bulk(t *testing.T) { + tests := []struct { + name string + input []byte + expected RESP + consumed int + }{ + {"simple", []byte("$5\r\nhello\r\n"), RESP{Type: Bulk, Data: []byte("hello"), Raw: []byte("$5\r\nhello\r\n")}, 11}, + {"null", []byte("$-1\r\n"), RESP{Type: Bulk, Data: nil, Raw: []byte("$-1\r\n")}, 5}, + {"empty", []byte("$0\r\n\r\n"), RESP{Type: Bulk, Data: []byte{}, Raw: []byte("$0\r\n\r\n")}, 6}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + n, resp := ReadNextRESP(tt.input) + assert.Equal(t, tt.consumed, n) + assert.Equal(t, tt.expected.Type, resp.Type) + assert.Equal(t, tt.expected.Data, resp.Data) + }) + } +} + +func TestReadNextRESP_Array(t *testing.T) { + input := []byte("*2\r\n$3\r\nfoo\r\n$3\r\nbar\r\n") + n, resp := ReadNextRESP(input) + assert.Equal(t, len(input), n) + assert.Equal(t, Type('*'), resp.Type) + assert.Equal(t, 2, resp.Count) +} + +func TestReadNextRESP_Error(t *testing.T) { + input := []byte("-Error message\r\n") + n, resp := ReadNextRESP(input) + assert.Equal(t, 16, n) + assert.Equal(t, Type('-'), resp.Type) + assert.Equal(t, []byte("Error message"), resp.Data) +} + +func TestReadNextRESP_Invalid(t *testing.T) { + tests := []struct { + name string + input []byte + }{ + {"empty", []byte{}}, + {"unknown type", []byte("?test\r\n")}, + {"missing cr", []byte("+test\n")}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + n, resp := ReadNextRESP(tt.input) + assert.Equal(t, 0, n) + assert.Equal(t, RESP{}, resp) + }) + } +} + +func TestForEach(t *testing.T) { + input := []byte("*3\r\n$3\r\nfoo\r\n$3\r\nbar\r\n$3\r\nbaz\r\n") + _, resp := ReadNextRESP(input) + + var results []string + resp.ForEach(func(r RESP) bool { + results = append(results, string(r.Data)) + return true + }) + + assert.Equal(t, []string{"foo", "bar", "baz"}, results) +} + +func TestForEachBreak(t *testing.T) { + input := []byte("*3\r\n$3\r\nfoo\r\n$3\r\nbar\r\n$3\r\nbaz\r\n") + _, resp := ReadNextRESP(input) + + count := 0 + resp.ForEach(func(r RESP) bool { + count++ + return count < 2 + }) + + assert.Equal(t, 2, count) +} diff --git a/redhub.go b/redhub.go index 0a478d5..81c6d92 100644 --- a/redhub.go +++ b/redhub.go @@ -6,7 +6,7 @@ import ( "time" "github.com/IceFireDB/redhub/pkg/resp" - "github.com/panjf2000/gnet" + "github.com/panjf2000/gnet/v2" ) // Action represents the type of action to be taken after an event @@ -36,15 +36,16 @@ type Options struct { ReusePort bool Ticker bool TCPKeepAlive time.Duration + TCPKeepCount int + TCPKeepInterval time.Duration TCPNoDelay gnet.TCPSocketOpt SocketRecvBuffer int SocketSendBuffer int - Codec gnet.ICodec + EdgeTriggeredIO bool } // RedHub represents the main server structure type RedHub struct { - *gnet.EventServer onOpened func(c *Conn) (out []byte, action Action) onClosed func(c *Conn, err error) (action Action) handler func(cmd resp.Command, out []byte) ([]byte, Action) @@ -73,8 +74,17 @@ func NewRedHub( } } -// OnOpened is called when a new connection is opened -func (rs *RedHub) OnOpened(c gnet.Conn) (out []byte, action gnet.Action) { +// OnBoot fires when the engine is ready for accepting connections +func (rs *RedHub) OnBoot(eng gnet.Engine) (action gnet.Action) { + return gnet.None +} + +// OnShutdown fires when the engine is being shut down +func (rs *RedHub) OnShutdown(eng gnet.Engine) { +} + +// OnOpen fires when a new connection is opened +func (rs *RedHub) OnOpen(c gnet.Conn) (out []byte, action gnet.Action) { rs.connSync.Lock() rs.redHubBufMap[c] = new(connBuffer) rs.connSync.Unlock() @@ -82,28 +92,36 @@ func (rs *RedHub) OnOpened(c gnet.Conn) (out []byte, action gnet.Action) { return out, gnet.Action(act) } -// OnClosed is called when a connection is closed -func (rs *RedHub) OnClosed(c gnet.Conn, err error) (action gnet.Action) { +// OnClose fires when a connection is closed +func (rs *RedHub) OnClose(c gnet.Conn, err error) (action gnet.Action) { rs.connSync.Lock() delete(rs.redHubBufMap, c) rs.connSync.Unlock() return gnet.Action(rs.onClosed(&Conn{Conn: c}, err)) } -// React handles incoming data from connections -func (rs *RedHub) React(frame []byte, c gnet.Conn) (out []byte, action gnet.Action) { +// OnTraffic fires when a socket receives data from the remote +func (rs *RedHub) OnTraffic(c gnet.Conn) (action gnet.Action) { + var out []byte rs.connSync.RLock() cb, ok := rs.redHubBufMap[c] rs.connSync.RUnlock() if !ok { - return resp.AppendError(out, "ERR Client is closed"), gnet.None + c.AsyncWrite(resp.AppendError(nil, "ERR Client is closed"), nil) + return gnet.None + } + + buf, _ := c.Next(-1) + if len(buf) == 0 { + return gnet.None } - cb.buf.Write(frame) + cb.buf.Write(buf) cmds, lastbyte, err := resp.ReadCommands(cb.buf.Bytes()) if err != nil { - return resp.AppendError(out, "ERR "+err.Error()), gnet.None + c.AsyncWrite(resp.AppendError(nil, "ERR "+err.Error()), nil) + return gnet.None } cb.command = append(cb.command, cmds...) @@ -115,35 +133,70 @@ func (rs *RedHub) React(frame []byte, c gnet.Conn) (out []byte, action gnet.Acti cb.command = cb.command[1:] var status Action - out, status = rs.handler(cmd, out) + result, status := rs.handler(cmd, out) + if len(result) > 0 { + c.AsyncWrite(result, nil) + } if status == Close { - return out, gnet.Close + return gnet.Close } } } else { cb.buf.Write(lastbyte) } - return out, gnet.None + return gnet.None +} + +// OnTick fires immediately after the engine starts +func (rs *RedHub) OnTick() (delay time.Duration, action gnet.Action) { + return 0, gnet.None } // ListenAndServe starts the RedHub server func ListenAndServe(addr string, options Options, rh *RedHub) error { - serveOptions := gnet.Options{ - Multicore: options.Multicore, - LockOSThread: options.LockOSThread, - ReadBufferCap: options.ReadBufferCap, - LB: options.LB, - NumEventLoop: options.NumEventLoop, - ReusePort: options.ReusePort, - Ticker: options.Ticker, - TCPKeepAlive: options.TCPKeepAlive, - TCPNoDelay: options.TCPNoDelay, - SocketRecvBuffer: options.SocketRecvBuffer, - SocketSendBuffer: options.SocketSendBuffer, - Codec: options.Codec, - } - - return gnet.Serve(rh, addr, gnet.WithOptions(serveOptions)) + var opts []gnet.Option + + if options.Multicore { + opts = append(opts, gnet.WithMulticore(true)) + } + if options.LockOSThread { + opts = append(opts, gnet.WithLockOSThread(true)) + } + if options.ReadBufferCap > 0 { + opts = append(opts, gnet.WithReadBufferCap(options.ReadBufferCap)) + } + if options.NumEventLoop > 0 { + opts = append(opts, gnet.WithNumEventLoop(options.NumEventLoop)) + } else if options.LB != gnet.RoundRobin { + opts = append(opts, gnet.WithLoadBalancing(options.LB)) + } + if options.ReusePort { + opts = append(opts, gnet.WithReusePort(true)) + } + if options.Ticker { + opts = append(opts, gnet.WithTicker(true)) + } + if options.TCPKeepAlive > 0 { + opts = append(opts, gnet.WithTCPKeepAlive(options.TCPKeepAlive)) + } + if options.TCPKeepCount > 0 { + opts = append(opts, gnet.WithTCPKeepCount(options.TCPKeepCount)) + } + if options.TCPKeepInterval > 0 { + opts = append(opts, gnet.WithTCPKeepInterval(options.TCPKeepInterval)) + } + opts = append(opts, gnet.WithTCPNoDelay(options.TCPNoDelay)) + if options.SocketRecvBuffer > 0 { + opts = append(opts, gnet.WithSocketRecvBuffer(options.SocketRecvBuffer)) + } + if options.SocketSendBuffer > 0 { + opts = append(opts, gnet.WithSocketSendBuffer(options.SocketSendBuffer)) + } + if options.EdgeTriggeredIO { + opts = append(opts, gnet.WithEdgeTriggeredIO(true)) + } + + return gnet.Run(rh, addr, opts...) } diff --git a/redhub_test.go b/redhub_test.go index dd3241f..96bc351 100644 --- a/redhub_test.go +++ b/redhub_test.go @@ -3,9 +3,10 @@ package redhub import ( "net" "testing" + "time" "github.com/IceFireDB/redhub/pkg/resp" - "github.com/panjf2000/gnet" + "github.com/panjf2000/gnet/v2" "github.com/stretchr/testify/assert" ) @@ -14,11 +15,13 @@ type mockConn struct { id string closed bool written []byte + buf []byte + ctx interface{} } -func (m *mockConn) Write(buf []byte) error { +func (m *mockConn) Write(buf []byte) (n int, err error) { m.written = append(m.written, buf...) - return nil + return len(buf), nil } func (m *mockConn) Close() error { @@ -26,9 +29,30 @@ func (m *mockConn) Close() error { return nil } -func (m *mockConn) Context() interface{} { return nil } -func (m *mockConn) SetContext(interface{}) {} -func (m *mockConn) RemoteAddr() net.Addr { +func (m *mockConn) Next(n int) (buf []byte, err error) { + if len(m.buf) == 0 { + return nil, nil + } + if n == -1 || n > len(m.buf) { + buf = make([]byte, len(m.buf)) + copy(buf, m.buf) + m.buf = nil + return buf, nil + } + buf = make([]byte, n) + copy(buf, m.buf[:n]) + m.buf = m.buf[n:] + return buf, nil +} + +func (m *mockConn) AsyncWrite(buf []byte, callback gnet.AsyncCallback) error { + m.written = append(m.written, buf...) + return nil +} + +func (m *mockConn) Context() interface{} { return m.ctx } +func (m *mockConn) SetContext(v interface{}) { m.ctx = v } +func (m *mockConn) RemoteAddr() net.Addr { return &net.TCPAddr{ IP: net.ParseIP("127.0.0.1"), Port: 6379, @@ -47,14 +71,14 @@ func TestNewRedHub(t *testing.T) { assert.NotNil(t, rh.connSync) } -func TestOnOpened(t *testing.T) { - onOpened := func(c *Conn) ([]byte, Action) { - return []byte("WELCOME"), None +func TestOnOpen(t *testing.T) { + onOpened := func(c *Conn) ([]byte, Action) { + return []byte("WELCOME"), None } rh := NewRedHub(onOpened, nil, nil) mock := &mockConn{id: "test1"} - out, action := rh.OnOpened(mock) + out, action := rh.OnOpen(mock) assert.Equal(t, "WELCOME", string(out)) assert.Equal(t, gnet.None, action) @@ -64,9 +88,32 @@ func TestOnOpened(t *testing.T) { assert.True(t, ok) } -func TestOnClosed(t *testing.T) { - onClosed := func(c *Conn, err error) Action { - return Close +func TestOnOpen_WithData(t *testing.T) { + onOpened := func(c *Conn) ([]byte, Action) { + return []byte("+PONG\r\n"), None + } + rh := NewRedHub(onOpened, nil, nil) + + mock := &mockConn{id: "test2"} + out, action := rh.OnOpen(mock) + assert.Equal(t, "+PONG\r\n", string(out)) + assert.Equal(t, gnet.None, action) +} + +func TestOnOpen_CloseAction(t *testing.T) { + onOpened := func(c *Conn) ([]byte, Action) { + return nil, Close + } + rh := NewRedHub(onOpened, nil, nil) + + mock := &mockConn{id: "test3"} + _, action := rh.OnOpen(mock) + assert.Equal(t, gnet.Close, action) +} + +func TestOnClose(t *testing.T) { + onClosed := func(c *Conn, err error) Action { + return Close } rh := NewRedHub(nil, onClosed, nil) @@ -75,7 +122,7 @@ func TestOnClosed(t *testing.T) { rh.redHubBufMap[mock] = &connBuffer{} rh.connSync.Unlock() - action := rh.OnClosed(mock, nil) + action := rh.OnClose(mock, nil) assert.Equal(t, gnet.Close, action) rh.connSync.RLock() @@ -84,46 +131,174 @@ func TestOnClosed(t *testing.T) { assert.False(t, ok) } -func TestReact_InvalidCommand(t *testing.T) { +func TestOnClose_WithError(t *testing.T) { + onClosed := func(c *Conn, err error) Action { + assert.NotNil(t, err) + return None + } + rh := NewRedHub(nil, onClosed, nil) + + mock := &mockConn{id: "test2"} + rh.connSync.Lock() + rh.redHubBufMap[mock] = &connBuffer{} + rh.connSync.Unlock() + + err := assert.AnError + action := rh.OnClose(mock, err) + assert.Equal(t, gnet.None, action) +} + +func TestOnTraffic_InvalidCommand(t *testing.T) { handler := func(cmd resp.Command, out []byte) ([]byte, Action) { return out, None } rh := NewRedHub(nil, nil, handler) - mock := &mockConn{id: "test1"} - out, action := rh.React([]byte("invalid command"), mock) - assert.Contains(t, string(out), "ERR") + mock := &mockConn{id: "test1", buf: []byte("invalid command")} + action := rh.OnTraffic(mock) assert.Equal(t, gnet.None, action) + assert.Contains(t, string(mock.written), "ERR") } -func TestReact_ValidCommand(t *testing.T) { +func TestOnTraffic_ValidCommand(t *testing.T) { handler := func(cmd resp.Command, out []byte) ([]byte, Action) { return append(out, []byte("OK")...), None } rh := NewRedHub(nil, nil, handler) - mock := &mockConn{id: "test1"} + mock := &mockConn{id: "test1", buf: []byte("*1\r\n$4\r\nPING\r\n")} rh.connSync.Lock() rh.redHubBufMap[mock] = &connBuffer{} rh.connSync.Unlock() - // Test a simple PING command - out, action := rh.React([]byte("*1\r\n$4\r\nPING\r\n"), mock) - assert.Equal(t, "OK", string(out)) + action := rh.OnTraffic(mock) + assert.Equal(t, "OK", string(mock.written)) assert.Equal(t, gnet.None, action) } -func TestReact_CloseAction(t *testing.T) { +func TestOnTraffic_CloseAction(t *testing.T) { handler := func(cmd resp.Command, out []byte) ([]byte, Action) { return out, Close } rh := NewRedHub(nil, nil, handler) + mock := &mockConn{id: "test1", buf: []byte("*1\r\n$4\r\nQUIT\r\n")} + rh.connSync.Lock() + rh.redHubBufMap[mock] = &connBuffer{} + rh.connSync.Unlock() + + action := rh.OnTraffic(mock) + assert.Equal(t, gnet.Close, action) +} + +func TestOnTraffic_MultipleCommands(t *testing.T) { + var callCount int + handler := func(cmd resp.Command, out []byte) ([]byte, Action) { + callCount++ + return resp.AppendString(out, "OK"), None + } + rh := NewRedHub(nil, nil, handler) + + mock := &mockConn{id: "test1", buf: []byte("*2\r\n$3\r\nSET\r\n$3\r\nkey\r\n*2\r\n$3\r\nGET\r\n$3\r\nkey\r\n")} + rh.connSync.Lock() + rh.redHubBufMap[mock] = &connBuffer{} + rh.connSync.Unlock() + + action := rh.OnTraffic(mock) + assert.Equal(t, gnet.None, action) + assert.Equal(t, 2, callCount) +} + +func TestOnTraffic_EmptyBuffer(t *testing.T) { + handler := func(cmd resp.Command, out []byte) ([]byte, Action) { + return out, None + } + rh := NewRedHub(nil, nil, handler) + + mock := &mockConn{id: "test1", buf: []byte{}} + rh.connSync.Lock() + rh.redHubBufMap[mock] = &connBuffer{} + rh.connSync.Unlock() + + action := rh.OnTraffic(mock) + assert.Equal(t, gnet.None, action) + assert.Equal(t, 0, len(mock.written)) +} + +func TestOnBoot(t *testing.T) { + rh := NewRedHub(nil, nil, nil) + action := rh.OnBoot(gnet.Engine{}) + assert.Equal(t, gnet.None, action) +} + +func TestOnShutdown(t *testing.T) { + rh := NewRedHub(nil, nil, nil) + rh.OnShutdown(gnet.Engine{}) +} + +func TestOnTick(t *testing.T) { + rh := NewRedHub(nil, nil, nil) + delay, action := rh.OnTick() + assert.Equal(t, time.Duration(0), delay) + assert.Equal(t, gnet.None, action) +} + +func TestContextHandling(t *testing.T) { + onOpened := func(c *Conn) ([]byte, Action) { + c.SetContext("test-value") + return nil, None + } + onClosed := func(c *Conn, err error) Action { + ctx := c.Context() + assert.Equal(t, "test-value", ctx) + return None + } + rh := NewRedHub(onOpened, onClosed, nil) + mock := &mockConn{id: "test1"} + rh.OnOpen(mock) + rh.OnClose(mock, nil) +} + +func TestBulkDataHandling(t *testing.T) { + smallData := make([]byte, 10) + for i := range smallData { + smallData[i] = byte(i % 256) + } + + handler := func(cmd resp.Command, out []byte) ([]byte, Action) { + if len(cmd.Args) > 2 { + return resp.AppendBulk(out, cmd.Args[2]), None + } + return resp.AppendError(out, "ERR missing argument"), None + } + rh := NewRedHub(nil, nil, handler) + + buf := append([]byte("*3\r\n$3\r\nSET\r\n$4\r\nkey\r\n"), resp.AppendBulk(nil, smallData)...) + buf = buf[:len(buf)-2] + buf = append(buf, '\r', '\n') + + mock := &mockConn{id: "test1", buf: buf} + rh.connSync.Lock() + rh.redHubBufMap[mock] = &connBuffer{} + rh.connSync.Unlock() + + action := rh.OnTraffic(mock) + assert.Equal(t, gnet.None, action) + assert.True(t, len(mock.written) > 10) +} + +func TestShutdownAction(t *testing.T) { + handler := func(cmd resp.Command, out []byte) ([]byte, Action) { + return out, Close + } + rh := NewRedHub(nil, nil, handler) + + mock := &mockConn{id: "test1", buf: []byte("*1\r\n$4\r\nQUIT\r\n")} rh.connSync.Lock() rh.redHubBufMap[mock] = &connBuffer{} rh.connSync.Unlock() - _, action := rh.React([]byte("*1\r\n$4\r\nQUIT\r\n"), mock) + action := rh.OnTraffic(mock) assert.Equal(t, gnet.Close, action) }