Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

One pool per node #19

Draft
wants to merge 6 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 24 additions & 24 deletions src/Database/Redis/Cluster.hs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ module Database.Redis.Cluster
, Shard(..)
, TimeoutException(..)
, connect
, disconnect
, destroyNodeResources
, requestPipelined
, requestMasterNodes
, nodes
Expand All @@ -28,6 +28,7 @@ import Data.List(nub, sortBy, find)
import Data.Map(fromListWith, assocs)
import Data.Function(on)
import Control.Exception(Exception, SomeException, throwIO, BlockedIndefinitelyOnMVar(..), catches, Handler(..), try, fromException)
import Data.Pool(Pool, createPool, withResource, destroyAllResources)
import Control.Concurrent.Async(race)
import Control.Concurrent(threadDelay)
import Control.Concurrent.MVar(MVar, newMVar, readMVar, modifyMVar, modifyMVar_)
Expand Down Expand Up @@ -62,7 +63,7 @@ type IsReadOnly = Bool
data Connection = Connection (HM.HashMap NodeID NodeConnection) (MVar Pipeline) (MVar ShardMap) CMD.InfoMap IsReadOnly

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since a pool is now per node, the map "HM.HashMap NodeID NodeConnection" is probably never going to update.
Can we find a way to update this mapping also? This will fail requests when we scale up Redis cluster where we won't be able to find the new NodeID in this map.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ishan-juspay is it okay if we trigger the refresh logic at the point where some NodeId is not found in the map? We can have a Pool of a pair of HashMap and ShardMap and whenever some NodeId is not found in the HashMap, it will throw and Exception and a new constructor will be called for the Pair.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And this way, if some Resources are working with NodeId that they are able to find, then they will keep working without any problems.

Copy link

@ishan-juspay ishan-juspay Apr 10, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can have this HashMap as an MVar and update it whenever RefreshShardMap is called ? @aravindgopall


-- | A connection to a single node in the cluster, similar to 'ProtocolPipelining.Connection'
data NodeConnection = NodeConnection CC.ConnectionContext (IOR.IORef (Maybe B.ByteString)) NodeID
data NodeConnection = NodeConnection (Pool CC.ConnectionContext) (IOR.IORef (Maybe B.ByteString)) NodeID

instance Show NodeConnection where
show (NodeConnection _ _ id1) = "nodeId: " <> show id1
Expand Down Expand Up @@ -128,8 +129,8 @@ instance Exception NoNodeException
data TimeoutException = TimeoutException String deriving (Show, Typeable)
instance Exception TimeoutException

connect :: (Host -> CC.PortID -> Maybe Int -> IO CC.ConnectionContext) -> [CMD.CommandInfo] -> MVar ShardMap -> Maybe Int -> Bool -> ([NodeConnection] -> IO ShardMap) -> IO Connection
connect withAuth commandInfos shardMapVar timeoutOpt isReadOnly refreshShardMap = do
connect :: (Host -> CC.PortID -> Maybe Int -> IO CC.ConnectionContext) -> [CMD.CommandInfo] -> MVar ShardMap -> Maybe Int -> Bool -> ([NodeConnection] -> IO ShardMap) -> Time.NominalDiffTime -> Int -> IO Connection
connect withAuth commandInfos shardMapVar timeoutOpt isReadOnly refreshShardMap idleTime maxResources = do
shardMap <- readMVar shardMapVar
stateVar <- newMVar $ Pending []
pipelineVar <- newMVar $ Pipeline stateVar
Expand Down Expand Up @@ -161,15 +162,15 @@ connect withAuth commandInfos shardMapVar timeoutOpt isReadOnly refreshShardMap
) (mempty, False) info
connectNode :: Node -> IO (NodeID, NodeConnection)
connectNode (Node n _ host port) = do
ctx <- withAuth host (CC.PortNumber $ toEnum port) timeoutOpt
ctx <- createPool (withAuth host (CC.PortNumber $ toEnum port) timeoutOpt) CC.disconnect 1 idleTime maxResources
ref <- IOR.newIORef Nothing
return (n, NodeConnection ctx ref n)
refreshShardMapVar :: ShardMap -> IO ()
refreshShardMapVar shardMap = hasLocked $ modifyMVar_ shardMapVar (const (pure shardMap))

disconnect :: Connection -> IO ()
disconnect (Connection nodeConnMap _ _ _ _ ) = mapM_ disconnectNode (HM.elems nodeConnMap) where
disconnectNode (NodeConnection nodeCtx _ _) = CC.disconnect nodeCtx
destroyNodeResources :: Connection -> IO ()
destroyNodeResources (Connection nodeConnMap _ _ _ _ ) = mapM_ disconnectNode (HM.elems nodeConnMap) where
disconnectNode (NodeConnection nodePool _ _) = destroyAllResources nodePool

-- Add a request to the current pipeline for this connection. The pipeline will
-- be executed implicitly as soon as any result returned from this function is
Expand Down Expand Up @@ -453,34 +454,33 @@ allMasterNodes (Connection nodeConns _ _ _ _) (ShardMap shardMap) =
onlyMasterNodes = (\(Shard master _) -> master) <$> nub (IntMap.elems shardMap)

requestNode :: NodeConnection -> [[B.ByteString]] -> IO [Reply]
requestNode (NodeConnection ctx lastRecvRef _) requests = do
requestNode (NodeConnection pool lastRecvRef _) requests = withResource pool $ \ctx -> do
envTimeout <- round . (\x -> (x :: Time.NominalDiffTime) * 1000000) . realToFrac . fromMaybe (0.5 :: Double) . (>>= readMaybe) <$> lookupEnv "REDIS_REQUEST_NODE_TIMEOUT"
eresp <- race requestNodeImpl (threadDelay envTimeout)
eresp <- race (requestNodeImpl ctx) (threadDelay envTimeout)
case eresp of
Left e -> return e
Right _ -> putStrLn "timeout happened" *> throwIO (TimeoutException "Request Timeout")

where
requestNodeImpl :: IO [Reply]
requestNodeImpl = do
mapM_ (sendNode . renderRequest) requests
requestNodeImpl :: CC.ConnectionContext -> IO [Reply]
requestNodeImpl ctx = do
mapM_ (sendNode ctx . renderRequest) requests
_ <- CC.flush ctx
replicateM (length requests) recvNode
sendNode :: B.ByteString -> IO ()
sendNode = CC.send ctx
recvNode :: IO Reply
recvNode = do
replicateM (length requests) $ recvNode ctx
sendNode :: CC.ConnectionContext -> B.ByteString -> IO ()
sendNode = CC.send
recvNode :: CC.ConnectionContext -> IO Reply
recvNode ctx = do
maybeLastRecv <- IOR.readIORef lastRecvRef
scanResult <- case maybeLastRecv of
Just lastRecv -> Scanner.scanWith (CC.recv ctx) reply lastRecv
Nothing -> Scanner.scanWith (CC.recv ctx) reply B.empty

case scanResult of
Scanner.Fail{} -> CC.errConnClosed
Scanner.More{} -> error "Hedis: parseWith returned Partial"
Scanner.Done rest' r -> do
IOR.writeIORef lastRecvRef (Just rest')
return r
Scanner.Fail{} -> CC.errConnClosed
Scanner.More{} -> error "Hedis: parseWith returned Partial"
Scanner.Done rest' r -> do
IOR.writeIORef lastRecvRef (Just rest')
return r

{-# INLINE nodes #-}
nodes :: ShardMap -> [Node]
Expand Down
65 changes: 39 additions & 26 deletions src/Database/Redis/Connection.hs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE ScopedTypeVariables #-}

module Database.Redis.Connection where

import Control.Exception
Expand Down Expand Up @@ -33,6 +32,7 @@ import qualified Database.Redis.Cluster as Cluster
import qualified Database.Redis.ConnectionContext as CC
import Control.Concurrent (threadDelay)
import Control.Concurrent.Async (race)
import qualified Database.Redis.Types as T
--import qualified Database.Redis.Cluster.Pipeline as ClusterPipeline

import Database.Redis.Commands
Expand All @@ -54,7 +54,7 @@ import Database.Redis.Commands
-- 'connect' function to create one.
data Connection
= NonClusteredConnection (Pool PP.Connection)
| ClusteredConnection (MVar ShardMap) (Pool Cluster.Connection)
| ClusteredConnection (MVar ShardMap) Cluster.Connection

-- |Information for connnecting to a Redis server.
--
Expand Down Expand Up @@ -173,7 +173,7 @@ checkedConnect connInfo = do
-- |Destroy all idle resources in the pool.
disconnect :: Connection -> IO ()
disconnect (NonClusteredConnection pool) = destroyAllResources pool
disconnect (ClusteredConnection _ pool) = destroyAllResources pool
disconnect (ClusteredConnection _ conn) = Cluster.destroyNodeResources conn

-- | Memory bracket around 'connect' and 'disconnect'.
withConnect :: (Catch.MonadMask m, MonadIO m) => ConnectInfo -> (Connection -> m c) -> m c
Expand All @@ -191,8 +191,8 @@ withCheckedConnect connInfo = bracket (checkedConnect connInfo) disconnect
runRedis :: Connection -> Redis a -> IO a
runRedis (NonClusteredConnection pool) redis =
withResource pool $ \conn -> runRedisInternal conn redis
runRedis (ClusteredConnection _ pool) redis =
withResource pool $ \conn -> runRedisClusteredInternal conn (refreshShardMap conn) redis
runRedis (ClusteredConnection _ conn) redis =
runRedisClusteredInternal conn (refreshShardMap conn) redis

newtype ClusterConnectError = ClusterConnectError Reply
deriving (Eq, Show, Typeable)
Expand Down Expand Up @@ -224,9 +224,10 @@ connectCluster bootstrapConnInfo = do
Right infos -> do
let
isConnectionReadOnly = connectReadOnly bootstrapConnInfo
clusterConnection = Cluster.connect withAuth infos shardMapVar timeoutOptUs isConnectionReadOnly refreshShardMapWithNodeConn
pool <- createPool (clusterConnect isConnectionReadOnly clusterConnection) Cluster.disconnect 1 (connectMaxIdleTime bootstrapConnInfo) (connectMaxConnections bootstrapConnInfo)
return $ ClusteredConnection shardMapVar pool
clusterConnection = Cluster.connect withAuth infos shardMapVar timeoutOptUs isConnectionReadOnly refreshShardMapWithNodeConn (connectMaxIdleTime bootstrapConnInfo) (connectMaxConnections bootstrapConnInfo)
-- pool <- createPool (clusterConnect isConnectionReadOnly clusterConnection) Cluster.disconnect 3 (connectMaxIdleTime bootstrapConnInfo) (connectMaxConnections bootstrapConnInfo)
connection <- clusterConnect isConnectionReadOnly clusterConnection
return $ ClusteredConnection shardMapVar connection
where
withAuth host port timeout = do
conn <- PP.connect host port timeout
Expand All @@ -249,13 +250,24 @@ connectCluster bootstrapConnInfo = do
clusterConnect :: Bool -> IO Cluster.Connection -> IO Cluster.Connection
clusterConnect readOnlyConnection connection = do
clusterConn@(Cluster.Connection nodeMap _ _ _ _) <- connection
nodesConns <- sequence $ ( PP.fromCtx . (\(Cluster.NodeConnection ctx _ _) -> ctx ) . snd) <$> (HM.toList nodeMap)
nodesConns <- sequence $ (ctxToConn . snd) <$> (HM.toList nodeMap)
when readOnlyConnection $
mapM_ (\conn -> do
PP.beginReceiving conn
runRedisInternal conn readOnly
mapM_ (\maybeConn -> case maybeConn of
Just conn -> do
PP.beginReceiving conn
runRedisInternal conn readOnly
Nothing -> return $ Right (T.Status "Connection does not exist")
) nodesConns
return clusterConn
where
ctxToConn :: Cluster.NodeConnection -> IO (Maybe PP.Connection)
ctxToConn (Cluster.NodeConnection pool _ nid) = do
maybeConn <- try $ withResource pool PP.fromCtx

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have to remove the connection from pool if there is an exception? rather than catching with try and returning Nothing. Verify this once.

case maybeConn of
Right ppConn -> return $ Just ppConn
Left (_ :: SomeException) -> do
putStrLn ("SomeException Occured in NodeID " ++ show nid)
return Nothing

shardMapFromClusterSlotsResponse :: ClusterSlotsResponse -> IO ShardMap
shardMapFromClusterSlotsResponse ClusterSlotsResponse{..} = ShardMap <$> foldr mkShardMap (pure IntMap.empty) clusterSlotsResponseEntries where
Expand All @@ -282,20 +294,21 @@ refreshShardMapWithNodeConn :: [Cluster.NodeConnection] -> IO ShardMap
refreshShardMapWithNodeConn [] = throwIO $ ClusterConnectError (Error "Couldn't refresh shardMap due to connection error")
refreshShardMapWithNodeConn nodeConnsList = do
selectedIdx <- randomRIO (0, (length nodeConnsList) - 1)
let (Cluster.NodeConnection ctx _ _) = nodeConnsList !! selectedIdx
pipelineConn <- PP.fromCtx ctx
envTimeout <- fromMaybe (10 ^ (3 :: Int)) . (>>= readMaybe) <$> lookupEnv "REDIS_CLUSTER_SLOTS_TIMEOUT"
raceResult <- race (threadDelay envTimeout) (try $ refreshShardMapWithConn pipelineConn True) -- racing with delay of default 1 ms
case raceResult of
Left () -> do
print $ "TimeoutForConnection " <> show ctx
throwIO $ Cluster.TimeoutException "ClusterSlots Timeout"
Right eiShardMapResp ->
case eiShardMapResp of
Right shardMap -> pure shardMap
Left (err :: SomeException) -> do
print $ "ShardMapRefreshError-" <> show err
throwIO $ ClusterConnectError (Error "Couldn't refresh shardMap due to connection error")
let (Cluster.NodeConnection pool _ _) = nodeConnsList !! selectedIdx
withResource pool $ \ctx -> do
pipelineConn <- PP.fromCtx ctx
envTimeout <- fromMaybe (10 ^ (3 :: Int)) . (>>= readMaybe) <$> lookupEnv "REDIS_CLUSTER_SLOTS_TIMEOUT"
raceResult <- race (threadDelay envTimeout) (try $ refreshShardMapWithConn pipelineConn True) -- racing with delay of default 1 ms
case raceResult of
Left () -> do
print $ "TimeoutForConnection " <> show ctx
throwIO $ Cluster.TimeoutException "ClusterSlots Timeout"
Right eiShardMapResp ->
case eiShardMapResp of
Right shardMap -> pure shardMap
Left (err :: SomeException) -> do
print $ "ShardMapRefreshError-" <> show err
throwIO $ ClusterConnectError (Error "Couldn't refresh shardMap due to connection error")

refreshShardMapWithConn :: PP.Connection -> Bool -> IO ShardMap
refreshShardMapWithConn pipelineConn _ = do
Expand Down