Skip to content

Commit

Permalink
changing NodeConnection to Pool of NodeConnection and taking Cluster …
Browse files Browse the repository at this point in the history
…Connection out of the Pool
  • Loading branch information
shashi-kant-juspay committed Mar 15, 2023
1 parent 10e07f3 commit 3b588c0
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 57 deletions.
Binary file added src/Database/.DS_Store
Binary file not shown.
101 changes: 57 additions & 44 deletions src/Database/Redis/Cluster.hs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ module Database.Redis.Cluster
, HashSlot
, Shard(..)
, connect
, disconnect
, destroyNodeResources
, requestPipelined
, requestMasterNodes
, nodes
Expand All @@ -26,6 +26,7 @@ import Data.Maybe(mapMaybe, fromMaybe)
import Data.List(nub, sortBy, find)
import Data.Map(fromListWith, assocs)
import Data.Function(on)
import Data.Pool(Pool, createPool, withResource, destroyAllResources)
import Control.Exception(Exception, SomeException, throwIO, BlockedIndefinitelyOnMVar(..), catches, Handler(..), try)
import Control.Concurrent.Async(race)
import Control.Concurrent(threadDelay)
Expand Down Expand Up @@ -61,7 +62,7 @@ type IsReadOnly = Bool
data Connection = Connection (HM.HashMap NodeID NodeConnection) (MVar Pipeline) (MVar ShardMap) CMD.InfoMap IsReadOnly

-- | A connection to a single node in the cluster, similar to 'ProtocolPipelining.Connection'
data NodeConnection = NodeConnection CC.ConnectionContext (IOR.IORef (Maybe B.ByteString)) NodeID
data NodeConnection = NodeConnection (Pool CC.ConnectionContext) (IOR.IORef (Maybe B.ByteString)) NodeID

instance Eq NodeConnection where
(NodeConnection _ _ id1) == (NodeConnection _ _ id2) = id1 == id2
Expand Down Expand Up @@ -121,48 +122,61 @@ instance Exception CrossSlotException
data NoNodeException = NoNodeException deriving (Show, Typeable)
instance Exception NoNodeException

connect :: (Host -> CC.PortID -> Maybe Int -> IO CC.ConnectionContext) -> [CMD.CommandInfo] -> MVar ShardMap -> Maybe Int -> Bool -> (NodeConnection -> IO ShardMap) -> IO Connection
connect withAuth commandInfos shardMapVar timeoutOpt isReadOnly refreshShardMap = do
connect :: (Host -> CC.PortID -> Maybe Int -> IO CC.ConnectionContext) -> [CMD.CommandInfo] -> MVar ShardMap -> Maybe Int -> Bool -> (NodeConnection -> IO ShardMap) -> Time.NominalDiffTime -> Int -> IO Connection
connect withAuth commandInfos shardMapVar timeoutOpt isReadOnly _refreshShardMap idleTime maxResources = do
putStrLn "Inside Connect"
shardMap <- readMVar shardMapVar
-- shardMap <- modifyMVar shardMapVar (\(ShardMap intMap) -> case (IntMap.toList intMap) of
-- (first:rest) -> return (ShardMap (IntMap.fromList rest), ShardMap (IntMap.fromList [first]))
-- _ -> error "Invalid ShardMap")
stateVar <- newMVar $ Pending []
pipelineVar <- newMVar $ Pipeline stateVar
(eNodeConns, shouldRetry) <- nodeConnections shardMap
-- (eNodeConns, shouldRetry) <- nodeConnections shardMap
nodeConns <- nodeConnections shardMap
-- print $ map (\(k, _) -> k) $ HM.toList eNodeConns
-- return eNodeConns
-- whenever one of the node connection is not established,
-- will refresh the slots and retry node connections.
-- This would handle fail over, IP change use cases.
nodeConns <-
if shouldRetry
then if not (HM.null eNodeConns)
then do
newShardMap <- refreshShardMap (head $ HM.elems eNodeConns)
refreshShardMapVar newShardMap
simpleNodeConnections newShardMap
else
throwIO NoNodeException
else
return eNodeConns
return $ Connection nodeConns pipelineVar shardMapVar (CMD.newInfoMap commandInfos) isReadOnly where
simpleNodeConnections :: ShardMap -> IO (HM.HashMap NodeID NodeConnection)
simpleNodeConnections shardMap = HM.fromList <$> mapM connectNode (nub $ nodes shardMap)
nodeConnections :: ShardMap -> IO (HM.HashMap NodeID NodeConnection, Bool)
-- nodeConns <-
-- if shouldRetry
-- then if not (HM.null eNodeConns)
-- then do
-- newShardMap <- refreshShardMap (head $ HM.elems eNodeConns)
-- refreshShardMapVar newShardMap
-- simpleNodeConnections newShardMap
-- else
-- throwIO NoNodeException
-- else
-- return eNodeConns
putStrLn "Created NodeConnection's"
let connection = Connection nodeConns pipelineVar shardMapVar (CMD.newInfoMap commandInfos) isReadOnly
putStrLn "Created Cluster Connection object."
return connection
where
-- simpleNodeConnections :: ShardMap -> IO (HM.HashMap NodeID NodeConnection)
-- simpleNodeConnections shardMap = putStrLn "But so did I!" >> HM.fromList <$> (mapM connectNode (nub $ nodes shardMap) >>= mapM (\x -> print (fst x) >> return x))
nodeConnections :: ShardMap -> IO (HM.HashMap NodeID NodeConnection)
nodeConnections shardMap = do
putStrLn "Creating Connections..."
info <- mapM (try . connectNode) (nub $ nodes shardMap)
return $
foldl (\(acc, accB) x -> case x of
Right (v, nc) -> (HM.insert v nc acc, accB)
Left (_ :: SomeException) -> (acc, True)
) (mempty, False) info
let result = foldl (\acc x -> case x of
Right (v, nc) -> HM.insert v nc acc
Left (_ :: SomeException) -> acc
) mempty info
putStrLn "Created Connections."
return result
connectNode :: Node -> IO (NodeID, NodeConnection)
connectNode (Node n _ host port) = do
ctx <- withAuth host (CC.PortNumber $ toEnum port) timeoutOpt
ctx <- createPool (withAuth host (CC.PortNumber $ toEnum port) timeoutOpt) CC.disconnect 1 idleTime maxResources
ref <- IOR.newIORef Nothing
return (n, NodeConnection ctx ref n)
refreshShardMapVar :: ShardMap -> IO ()
refreshShardMapVar shardMap = hasLocked $ modifyMVar_ shardMapVar (const (pure shardMap))
-- refreshShardMapVar :: ShardMap -> IO ()
-- refreshShardMapVar shardMap = hasLocked $ modifyMVar_ shardMapVar (const (pure shardMap))

disconnect :: Connection -> IO ()
disconnect (Connection nodeConnMap _ _ _ _ ) = mapM_ disconnectNode (HM.elems nodeConnMap) where
disconnectNode (NodeConnection nodeCtx _ _) = CC.disconnect nodeCtx
destroyNodeResources :: Connection -> IO ()
destroyNodeResources (Connection nodeConnMap _ _ _ _ ) = mapM_ disconnectNode (HM.elems nodeConnMap) where
disconnectNode (NodeConnection nodePool _ _) = destroyAllResources nodePool

-- Add a request to the current pipeline for this connection. The pipeline will
-- be executed implicitly as soon as any result returned from this function is
Expand Down Expand Up @@ -432,34 +446,33 @@ allMasterNodes (Connection nodeConns _ _ _ _) (ShardMap shardMap) =
onlyMasterNodes = (\(Shard master _) -> master) <$> nub (IntMap.elems shardMap)

requestNode :: NodeConnection -> [[B.ByteString]] -> IO [Reply]
requestNode (NodeConnection ctx lastRecvRef _) requests = do
requestNode (NodeConnection pool lastRecvRef nid) requests = do
envTimeout <- round . (\x -> (x :: Time.NominalDiffTime) * 1000000) . realToFrac . fromMaybe (0.5 :: Double) . (>>= readMaybe) <$> lookupEnv "REDIS_REQUEST_NODE_TIMEOUT"
eresp <- race requestNodeImpl (threadDelay envTimeout)
case eresp of
Left e -> return e
Right _ -> putStrLn "timeout happened" *> throwIO NoNodeException

Left e -> return e
Right _ -> putStrLn ("timeout happened" ++ show nid) *> throwIO NoNodeException
where
requestNodeImpl :: IO [Reply]
requestNodeImpl = do
mapM_ (sendNode . renderRequest) requests
_ <- CC.flush ctx
_ <- withResource pool CC.flush
replicateM (length requests) recvNode
sendNode :: B.ByteString -> IO ()
sendNode = CC.send ctx
sendNode reqs = withResource pool (`CC.send` reqs)
recvNode :: IO Reply
recvNode = do
maybeLastRecv <- IOR.readIORef lastRecvRef
scanResult <- case maybeLastRecv of
Just lastRecv -> Scanner.scanWith (CC.recv ctx) reply lastRecv
Nothing -> Scanner.scanWith (CC.recv ctx) reply B.empty
Just lastRecv -> Scanner.scanWith (withResource pool CC.recv) reply lastRecv
Nothing -> Scanner.scanWith (withResource pool CC.recv) reply B.empty

case scanResult of
Scanner.Fail{} -> CC.errConnClosed
Scanner.More{} -> error "Hedis: parseWith returned Partial"
Scanner.Done rest' r -> do
IOR.writeIORef lastRecvRef (Just rest')
return r
Scanner.Fail{} -> CC.errConnClosed
Scanner.More{} -> error "Hedis: parseWith returned Partial"
Scanner.Done rest' r -> do
IOR.writeIORef lastRecvRef (Just rest')
return r

{-# INLINE nodes #-}
nodes :: ShardMap -> [Node]
Expand Down
40 changes: 27 additions & 13 deletions src/Database/Redis/Connection.hs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
{-# LANGUAGE TupleSections #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE ScopedTypeVariables #-}
module Database.Redis.Connection where

import Control.Exception
Expand All @@ -25,6 +26,7 @@ import Database.Redis.Protocol(Reply(..))
import Database.Redis.Cluster(ShardMap(..), Node, Shard(..))
import qualified Database.Redis.Cluster as Cluster
import qualified Database.Redis.ConnectionContext as CC
import qualified Database.Redis.Types as T
--import qualified Database.Redis.Cluster.Pipeline as ClusterPipeline
import Database.Redis.Commands
( ping
Expand All @@ -45,7 +47,7 @@ import Database.Redis.Commands
-- 'connect' function to create one.
data Connection
= NonClusteredConnection (Pool PP.Connection)
| ClusteredConnection (MVar ShardMap) (Pool Cluster.Connection)
| ClusteredConnection (MVar ShardMap) Cluster.Connection

-- |Information for connnecting to a Redis server.
--
Expand Down Expand Up @@ -164,7 +166,7 @@ checkedConnect connInfo = do
-- |Destroy all idle resources in the pool.
disconnect :: Connection -> IO ()
disconnect (NonClusteredConnection pool) = destroyAllResources pool
disconnect (ClusteredConnection _ pool) = destroyAllResources pool
disconnect (ClusteredConnection _ conn) = Cluster.destroyNodeResources conn

-- | Memory bracket around 'connect' and 'disconnect'.
withConnect :: (Catch.MonadMask m, MonadIO m) => ConnectInfo -> (Connection -> m c) -> m c
Expand All @@ -182,8 +184,8 @@ withCheckedConnect connInfo = bracket (checkedConnect connInfo) disconnect
runRedis :: Connection -> Redis a -> IO a
runRedis (NonClusteredConnection pool) redis =
withResource pool $ \conn -> runRedisInternal conn redis
runRedis (ClusteredConnection _ pool) redis =
withResource pool $ \conn -> runRedisClusteredInternal conn (refreshShardMap conn) redis
runRedis (ClusteredConnection _ conn) redis =
runRedisClusteredInternal conn (refreshShardMap conn) redis

newtype ClusterConnectError = ClusterConnectError Reply
deriving (Eq, Show, Typeable)
Expand Down Expand Up @@ -213,11 +215,14 @@ connectCluster bootstrapConnInfo = do
case commandInfos of
Left e -> throwIO $ ClusterConnectError e
Right infos -> do
putStrLn "Inside connectCluster"
let
isConnectionReadOnly = connectReadOnly bootstrapConnInfo
clusterConnection = Cluster.connect withAuth infos shardMapVar timeoutOptUs isConnectionReadOnly refreshShardMapWithNodeConn
pool <- createPool (clusterConnect isConnectionReadOnly clusterConnection) Cluster.disconnect 1 (connectMaxIdleTime bootstrapConnInfo) (connectMaxConnections bootstrapConnInfo)
return $ ClusteredConnection shardMapVar pool
clusterConnection = Cluster.connect withAuth infos shardMapVar timeoutOptUs isConnectionReadOnly refreshShardMapWithNodeConn (connectMaxIdleTime bootstrapConnInfo) (connectMaxConnections bootstrapConnInfo)
-- pool <- createPool (clusterConnect isConnectionReadOnly clusterConnection) Cluster.disconnect 3 (connectMaxIdleTime bootstrapConnInfo) (connectMaxConnections bootstrapConnInfo)
connection <- clusterConnect isConnectionReadOnly clusterConnection
putStrLn "Created clustered connection."
return $ ClusteredConnection shardMapVar connection
where
withAuth host port timeout = do
conn <- PP.connect host port timeout
Expand All @@ -240,13 +245,22 @@ connectCluster bootstrapConnInfo = do
clusterConnect :: Bool -> IO Cluster.Connection -> IO Cluster.Connection
clusterConnect readOnlyConnection connection = do
clusterConn@(Cluster.Connection nodeMap _ _ _ _) <- connection
nodesConns <- sequence $ ( PP.fromCtx . (\(Cluster.NodeConnection ctx _ _) -> ctx ) . snd) <$> (HM.toList nodeMap)
nodesConns <- sequence $ (ctxToConn . snd) <$> (HM.toList nodeMap)
when readOnlyConnection $
mapM_ (\conn -> do
PP.beginReceiving conn
runRedisInternal conn readOnly
mapM_ (\maybeConn -> case maybeConn of
Just conn -> do
PP.beginReceiving conn
runRedisInternal conn readOnly
Nothing -> return $ Right (T.Status "Connection does not exist")
) nodesConns
return clusterConn
where
ctxToConn :: Cluster.NodeConnection -> IO (Maybe PP.Connection)
ctxToConn (Cluster.NodeConnection pool _ _) = do
maybeConn <- try $ withResource pool PP.fromCtx
return $ case maybeConn of
Right ppConn -> Just ppConn
Left (_ :: SomeException) -> Nothing

shardMapFromClusterSlotsResponse :: ClusterSlotsResponse -> IO ShardMap
shardMapFromClusterSlotsResponse ClusterSlotsResponse{..} = ShardMap <$> foldr mkShardMap (pure IntMap.empty) clusterSlotsResponseEntries where
Expand All @@ -270,8 +284,8 @@ refreshShardMap (Cluster.Connection nodeConns _ _ _ _) =
refreshShardMapWithNodeConn (head $ HM.elems nodeConns)

refreshShardMapWithNodeConn :: Cluster.NodeConnection -> IO ShardMap
refreshShardMapWithNodeConn (Cluster.NodeConnection ctx _ _) = do
pipelineConn <- PP.fromCtx ctx
refreshShardMapWithNodeConn (Cluster.NodeConnection pool _ _) = do
pipelineConn <- withResource pool PP.fromCtx
refreshShardMapWithConn pipelineConn True

refreshShardMapWithConn :: PP.Connection -> Bool -> IO ShardMap
Expand Down

0 comments on commit 3b588c0

Please sign in to comment.