Skip to content

Commit

Permalink
feat: ✨ liq-src: kyber-pmm (#624)
Browse files Browse the repository at this point in the history
* feat: ✨ liq-src: kyber-pmm

Signed-off-by: thanhpp <[email protected]>

* refactor: ♻️ generate codes

Signed-off-by: thanhpp <[email protected]>

* refactor: ♻️ refactoring

Signed-off-by: thanhpp <[email protected]>

* fix: 🩹 add tokens & reserves

Signed-off-by: thanhpp <[email protected]>

* fix: 🩹 kyber-pmm: check quote tokens

Signed-off-by: thanhpp <[email protected]>

* fix: ✅ fix kyber-pmm test

Signed-off-by: thanhpp <[email protected]>

---------

Signed-off-by: thanhpp <[email protected]>
  • Loading branch information
thanhpp authored Feb 7, 2025
1 parent 7f651d9 commit 06bcf66
Show file tree
Hide file tree
Showing 12 changed files with 1,433 additions and 0 deletions.
21 changes: 21 additions & 0 deletions pkg/liquidity-source/kyber-pmm/client/error.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package client

import "errors"

const (
ErrFirmQuoteInternalErrorText = "internal_error"
ErrFirmQuoteBlacklistText = "blacklist"
ErrFirmQuoteInsufficientLiquidityText = "insufficient_liquidity"
ErrFirmQuoteMarketConditionText = "market_condition"
)

var (
ErrListTokensFailed = errors.New("listTokens failed")
ErrListPairsFailed = errors.New("listPairs failed")
ErrListPriceLevelsFailed = errors.New("listPriceLevels failed")
ErrFirmQuoteFailed = errors.New("firm quote failed")
ErrFirmQuoteInternalError = errors.New(ErrFirmQuoteInternalErrorText)
ErrFirmQuoteBlacklist = errors.New(ErrFirmQuoteBlacklistText)
ErrFirmQuoteInsufficientLiquidity = errors.New(ErrFirmQuoteInsufficientLiquidityText)
ErrFirmQuoteMarketCondition = errors.New(ErrFirmQuoteMarketConditionText)
)
126 changes: 126 additions & 0 deletions pkg/liquidity-source/kyber-pmm/client/http.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
package client

import (
"context"

"github.com/KyberNetwork/logger"
"github.com/go-resty/resty/v2"
"github.com/pkg/errors"

kyberpmm "github.com/KyberNetwork/kyberswap-dex-lib/pkg/liquidity-source/kyber-pmm"
)

const (
listTokensEndpoint = "/kyberswap/v1/tokens"
listPairsEndpoint = "/kyberswap/v1/pairs"
listPricesEndpoint = "/kyberswap/v1/prices"
firmEndpoint = "/kyberswap/v1/firm"
)

type httpClient struct {
client *resty.Client
config *kyberpmm.HTTPConfig
}

func NewHTTPClient(config *kyberpmm.HTTPConfig) *httpClient {
client := resty.New().
SetBaseURL(config.BaseURL).
SetTimeout(config.Timeout.Duration).
SetRetryCount(config.RetryCount)

return &httpClient{
client: client,
config: config,
}
}

func (c *httpClient) ListTokens(ctx context.Context) (map[string]kyberpmm.TokenItem, error) {
req := c.client.R().
SetContext(ctx)

var result kyberpmm.ListTokensResult
resp, err := req.SetResult(&result).Get(listTokensEndpoint)
if err != nil {
return nil, err
}

if !resp.IsSuccess() {
return nil, errors.WithMessagef(ErrListTokensFailed, "[kyberPMM] response status: %v, response error: %v", resp.Status(), resp.Error())
}

return result.Tokens, nil
}

func (c *httpClient) ListPairs(ctx context.Context) (map[string]kyberpmm.PairItem, error) {
req := c.client.R().
SetContext(ctx)

var result kyberpmm.ListPairsResult
resp, err := req.SetResult(&result).Get(listPairsEndpoint)
if err != nil {
return nil, err
}

if !resp.IsSuccess() {
return nil, errors.WithMessagef(ErrListPairsFailed, "[kyberPMM] response status: %v, response error: %v", resp.Status(), resp.Error())
}

return result.Pairs, nil
}

func (c *httpClient) ListPriceLevels(ctx context.Context) (kyberpmm.ListPriceLevelsResult, error) {
req := c.client.R().
SetContext(ctx)

var result kyberpmm.ListPriceLevelsResult
resp, err := req.SetResult(&result).Get(listPricesEndpoint)
if err != nil {
return result, err
}

if !resp.IsSuccess() {
return result, errors.WithMessagef(ErrListPriceLevelsFailed, "[kyberPMM] response status: %v, response error: %v", resp.Status(), resp.Error())
}

return result, nil
}

func (c *httpClient) Firm(ctx context.Context, params kyberpmm.FirmRequestParams) (kyberpmm.FirmResult, error) {
req := c.client.R().
SetContext(ctx).
SetBody(params)

var result kyberpmm.FirmResult
resp, err := req.SetResult(&result).Post(firmEndpoint)
if err != nil {
return kyberpmm.FirmResult{}, err
}

if !resp.IsSuccess() {
return kyberpmm.FirmResult{}, errors.WithMessagef(ErrFirmQuoteFailed, "[kyberPMM] response status: %v, response error: %v", resp.Status(), resp.Error())
}

if result.Error != "" {
parsedErr := parseFirmQuoteError(result.Error)
logger.Errorf("firm quote failed with error: %v", result.Error)

return kyberpmm.FirmResult{}, parsedErr
}

return result, nil
}

func parseFirmQuoteError(errorMessage string) error {
switch errorMessage {
case ErrFirmQuoteInternalErrorText:
return ErrFirmQuoteInternalError
case ErrFirmQuoteBlacklistText:
return ErrFirmQuoteBlacklist
case ErrFirmQuoteInsufficientLiquidityText:
return ErrFirmQuoteInsufficientLiquidity
case ErrFirmQuoteMarketConditionText:
return ErrFirmQuoteMarketCondition
default:
return ErrFirmQuoteInternalError
}
}
167 changes: 167 additions & 0 deletions pkg/liquidity-source/kyber-pmm/client/memory_cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
package client

import (
"context"
"errors"

"github.com/KyberNetwork/logger"
"github.com/dgraph-io/ristretto"

kyberpmm "github.com/KyberNetwork/kyberswap-dex-lib/pkg/liquidity-source/kyber-pmm"
)

const (
defaultNumCounts = 5000
defaultMaxCost = 500
defaultBufferItems = 64

defaultSingleItemCost = 1

cacheKeyTokens = "tokens"
cacheKeyPairs = "pairs"
cacheKeyPriceLevels = "price-levels"
)

type memoryCacheClient struct {
config *kyberpmm.MemoryCacheConfig
cache *ristretto.Cache
fallbackClient kyberpmm.IClient
}

func NewMemoryCacheClient(
config *kyberpmm.MemoryCacheConfig,
fallbackClient kyberpmm.IClient,
) *memoryCacheClient {
cache, err := ristretto.NewCache(&ristretto.Config{
NumCounters: defaultNumCounts,
MaxCost: defaultMaxCost,
BufferItems: defaultBufferItems,
})
if err != nil {
logger.Errorf("failed to init memory cache, err %v", err.Error())
}

return &memoryCacheClient{
config: config,
cache: cache,
fallbackClient: fallbackClient,
}
}

func (c *memoryCacheClient) ListTokens(ctx context.Context) (map[string]kyberpmm.TokenItem, error) {
cachedTokens, err := c.listTokensFromCache()
if err == nil {
return cachedTokens, nil
}

// Cache missed. Using fallbackClient
tokens, err := c.fallbackClient.ListTokens(ctx)
if err != nil {
return nil, err
}

if err = c.saveTokensToCache(tokens); err != nil {
logger.
WithFields(logger.Fields{"error": err}).
Warn("memory cache failed")
}

return tokens, err
}

// listTokensFromCache only returns if tokens are able to fetch from cache
func (c *memoryCacheClient) listTokensFromCache() (map[string]kyberpmm.TokenItem, error) {
cachedTokens, found := c.cache.Get(cacheKeyTokens)
if !found {
return nil, errors.New("no tokens data in cache")
}

return cachedTokens.(map[string]kyberpmm.TokenItem), nil
}

func (c *memoryCacheClient) saveTokensToCache(tokens map[string]kyberpmm.TokenItem) error {
c.cache.SetWithTTL(cacheKeyTokens, tokens, defaultSingleItemCost, c.config.TTL.Tokens.Duration)
c.cache.Wait()

return nil
}

func (c *memoryCacheClient) ListPairs(ctx context.Context) (map[string]kyberpmm.PairItem, error) {
cachedPairs, err := c.listPairsFromCache()
if err == nil {
return cachedPairs, nil
}

// Cache missed. Using fallbackClient
pairs, err := c.fallbackClient.ListPairs(ctx)
if err != nil {
return nil, err
}

if err = c.savePairsToCache(pairs); err != nil {
logger.
WithFields(logger.Fields{"error": err}).
Warn("memory cache failed")
}

return pairs, err
}

// listPairsFromCache only returns if pairs are able to fetch from cache
func (c *memoryCacheClient) listPairsFromCache() (map[string]kyberpmm.PairItem, error) {
cachedPairs, found := c.cache.Get(cacheKeyPairs)
if !found {
return nil, errors.New("no pairs data in cache")
}

return cachedPairs.(map[string]kyberpmm.PairItem), nil
}

func (c *memoryCacheClient) savePairsToCache(tokens map[string]kyberpmm.PairItem) error {
c.cache.SetWithTTL(cacheKeyPairs, tokens, defaultSingleItemCost, c.config.TTL.Pairs.Duration)
c.cache.Wait()

return nil
}

func (c *memoryCacheClient) ListPriceLevels(ctx context.Context) (kyberpmm.ListPriceLevelsResult, error) {
cachedPriceLevels, err := c.listPriceLevelsFromCache()
if err == nil {
return cachedPriceLevels, nil
}

// Cache missed. Using fallbackClient
priceLevels, err := c.fallbackClient.ListPriceLevels(ctx)
if err != nil {
return kyberpmm.ListPriceLevelsResult{}, err
}

if err = c.savePriceLevelsToCache(priceLevels); err != nil {
logger.
WithFields(logger.Fields{"error": err}).
Warn("memory cache failed")
}

return priceLevels, err
}

// listPriceLevelsFromCache only returns if price levels are able to fetch from cache
func (c *memoryCacheClient) listPriceLevelsFromCache() (kyberpmm.ListPriceLevelsResult, error) {
cachedPriceLevels, found := c.cache.Get(cacheKeyPriceLevels)
if !found {
return kyberpmm.ListPriceLevelsResult{}, errors.New("no price levels data in cache")
}

return cachedPriceLevels.(kyberpmm.ListPriceLevelsResult), nil
}

func (c *memoryCacheClient) savePriceLevelsToCache(priceLevelsAndInventory kyberpmm.ListPriceLevelsResult) error {
c.cache.SetWithTTL(cacheKeyPriceLevels, priceLevelsAndInventory, defaultSingleItemCost, c.config.TTL.PriceLevels.Duration)
c.cache.Wait()

return nil
}

func (c *memoryCacheClient) Firm(ctx context.Context, params kyberpmm.FirmRequestParams) (kyberpmm.FirmResult, error) {
return c.fallbackClient.Firm(ctx, params)
}
26 changes: 26 additions & 0 deletions pkg/liquidity-source/kyber-pmm/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package kyberpmm

import (
"github.com/KyberNetwork/blockchain-toolkit/time/durationjson"
)

type Config struct {
DexID string `json:"dexID,omitempty"`
RFQContractAddress string `mapstructure:"rfq_contract_address" json:"rfq_contract_address,omitempty"`
HTTP HTTPConfig `mapstructure:"http" json:"http,omitempty"`
MemoryCache MemoryCacheConfig `mapstructure:"memory_cache" json:"memory_cache,omitempty"`
}

type HTTPConfig struct {
BaseURL string `mapstructure:"base_url" json:"base_url,omitempty"`
Timeout durationjson.Duration `mapstructure:"timeout" json:"timeout,omitempty"`
RetryCount int `mapstructure:"retry_count" json:"retry_count,omitempty"`
}

type MemoryCacheConfig struct {
TTL struct {
Tokens durationjson.Duration `mapstructure:"tokens" json:"tokens,omitempty"`
Pairs durationjson.Duration `mapstructure:"pairs" json:"pairs,omitempty"`
PriceLevels durationjson.Duration `mapstructure:"price_levels" json:"price_levels,omitempty"`
} `mapstructure:"ttl"`
}
14 changes: 14 additions & 0 deletions pkg/liquidity-source/kyber-pmm/constant.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package kyberpmm

type SwapDirection uint8

const (
DexTypeKyberPMM = "kyber-pmm"

PoolIDPrefix = "kyber_pmm"
PoolIDSeparator = "_"
)

var (
DefaultGas = Gas{Swap: 100000}
)
13 changes: 13 additions & 0 deletions pkg/liquidity-source/kyber-pmm/error.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package kyberpmm

import "errors"

var (
ErrTokenNotFound = errors.New("token not found")
ErrNoPriceLevelsForPool = errors.New("no price levels for pool")
ErrEmptyPriceLevels = errors.New("empty price levels")
ErrInsufficientLiquidity = errors.New("insufficient liquidity")
ErrInvalidFirmQuoteParams = errors.New("invalid firm quote params")
ErrNoSwapLimit = errors.New("swap limit is required for PMM pools")
ErrNotEnoughInventoryIn = errors.New("not enough inventory in")
)
10 changes: 10 additions & 0 deletions pkg/liquidity-source/kyber-pmm/iface.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package kyberpmm

import "context"

type IClient interface {
ListTokens(ctx context.Context) (map[string]TokenItem, error)
ListPairs(ctx context.Context) (map[string]PairItem, error)
ListPriceLevels(ctx context.Context) (ListPriceLevelsResult, error)
Firm(ctx context.Context, params FirmRequestParams) (FirmResult, error)
}
Loading

0 comments on commit 06bcf66

Please sign in to comment.