diff --git a/src/Database/Redis/Cluster.hs b/src/Database/Redis/Cluster.hs index e22bec04..4a1367d4 100644 --- a/src/Database/Redis/Cluster.hs +++ b/src/Database/Redis/Cluster.hs @@ -13,7 +13,7 @@ module Database.Redis.Cluster , HashSlot , Shard(..) , connect - , disconnect + , destroyNodeResources , requestPipelined , requestMasterNodes , nodes @@ -26,6 +26,7 @@ import Data.Maybe(mapMaybe, fromMaybe) import Data.List(nub, sortBy, find) import Data.Map(fromListWith, assocs) import Data.Function(on) +import Data.Pool(Pool, createPool, withResource, destroyAllResources) import Control.Exception(Exception, SomeException, throwIO, BlockedIndefinitelyOnMVar(..), catches, Handler(..), try) import Control.Concurrent.Async(race) import Control.Concurrent(threadDelay) @@ -61,7 +62,7 @@ type IsReadOnly = Bool data Connection = Connection (HM.HashMap NodeID NodeConnection) (MVar Pipeline) (MVar ShardMap) CMD.InfoMap IsReadOnly -- | A connection to a single node in the cluster, similar to 'ProtocolPipelining.Connection' -data NodeConnection = NodeConnection CC.ConnectionContext (IOR.IORef (Maybe B.ByteString)) NodeID +data NodeConnection = NodeConnection (Pool CC.ConnectionContext) (IOR.IORef (Maybe B.ByteString)) NodeID instance Eq NodeConnection where (NodeConnection _ _ id1) == (NodeConnection _ _ id2) = id1 == id2 @@ -121,48 +122,56 @@ instance Exception CrossSlotException data NoNodeException = NoNodeException deriving (Show, Typeable) instance Exception NoNodeException -connect :: (Host -> CC.PortID -> Maybe Int -> IO CC.ConnectionContext) -> [CMD.CommandInfo] -> MVar ShardMap -> Maybe Int -> Bool -> (NodeConnection -> IO ShardMap) -> IO Connection -connect withAuth commandInfos shardMapVar timeoutOpt isReadOnly refreshShardMap = do +connect :: (Host -> CC.PortID -> Maybe Int -> IO CC.ConnectionContext) -> [CMD.CommandInfo] -> MVar ShardMap -> Maybe Int -> Bool -> (NodeConnection -> IO ShardMap) -> Time.NominalDiffTime -> Int -> IO Connection +connect withAuth commandInfos shardMapVar timeoutOpt isReadOnly _refreshShardMap idleTime maxResources = do shardMap <- readMVar shardMapVar + -- shardMap <- modifyMVar shardMapVar (\(ShardMap intMap) -> case (IntMap.toList intMap) of + -- (first:rest) -> return (ShardMap (IntMap.fromList rest), ShardMap (IntMap.fromList [first])) + -- _ -> error "Invalid ShardMap") stateVar <- newMVar $ Pending [] pipelineVar <- newMVar $ Pipeline stateVar - (eNodeConns, shouldRetry) <- nodeConnections shardMap + -- (eNodeConns, shouldRetry) <- nodeConnections shardMap + nodeConns <- nodeConnections shardMap + -- print $ map (\(k, _) -> k) $ HM.toList eNodeConns + -- return eNodeConns -- whenever one of the node connection is not established, -- will refresh the slots and retry node connections. -- This would handle fail over, IP change use cases. - nodeConns <- - if shouldRetry - then if not (HM.null eNodeConns) - then do - newShardMap <- refreshShardMap (head $ HM.elems eNodeConns) - refreshShardMapVar newShardMap - simpleNodeConnections newShardMap - else - throwIO NoNodeException - else - return eNodeConns - return $ Connection nodeConns pipelineVar shardMapVar (CMD.newInfoMap commandInfos) isReadOnly where - simpleNodeConnections :: ShardMap -> IO (HM.HashMap NodeID NodeConnection) - simpleNodeConnections shardMap = HM.fromList <$> mapM connectNode (nub $ nodes shardMap) - nodeConnections :: ShardMap -> IO (HM.HashMap NodeID NodeConnection, Bool) + -- nodeConns <- + -- if shouldRetry + -- then if not (HM.null eNodeConns) + -- then do + -- newShardMap <- refreshShardMap (head $ HM.elems eNodeConns) + -- refreshShardMapVar newShardMap + -- simpleNodeConnections newShardMap + -- else + -- throwIO NoNodeException + -- else + -- return eNodeConns + let connection = Connection nodeConns pipelineVar shardMapVar (CMD.newInfoMap commandInfos) isReadOnly + return connection + where + -- simpleNodeConnections :: ShardMap -> IO (HM.HashMap NodeID NodeConnection) + -- simpleNodeConnections shardMap = putStrLn "But so did I!" >> HM.fromList <$> (mapM connectNode (nub $ nodes shardMap) >>= mapM (\x -> print (fst x) >> return x)) + nodeConnections :: ShardMap -> IO (HM.HashMap NodeID NodeConnection) nodeConnections shardMap = do info <- mapM (try . connectNode) (nub $ nodes shardMap) - return $ - foldl (\(acc, accB) x -> case x of - Right (v, nc) -> (HM.insert v nc acc, accB) - Left (_ :: SomeException) -> (acc, True) - ) (mempty, False) info + let result = foldl (\acc x -> case x of + Right (v, nc) -> HM.insert v nc acc + Left (_ :: SomeException) -> acc + ) mempty info + return result connectNode :: Node -> IO (NodeID, NodeConnection) connectNode (Node n _ host port) = do - ctx <- withAuth host (CC.PortNumber $ toEnum port) timeoutOpt + ctx <- createPool (withAuth host (CC.PortNumber $ toEnum port) timeoutOpt) CC.disconnect 1 idleTime maxResources ref <- IOR.newIORef Nothing return (n, NodeConnection ctx ref n) - refreshShardMapVar :: ShardMap -> IO () - refreshShardMapVar shardMap = hasLocked $ modifyMVar_ shardMapVar (const (pure shardMap)) + -- refreshShardMapVar :: ShardMap -> IO () + -- refreshShardMapVar shardMap = hasLocked $ modifyMVar_ shardMapVar (const (pure shardMap)) -disconnect :: Connection -> IO () -disconnect (Connection nodeConnMap _ _ _ _ ) = mapM_ disconnectNode (HM.elems nodeConnMap) where - disconnectNode (NodeConnection nodeCtx _ _) = CC.disconnect nodeCtx +destroyNodeResources :: Connection -> IO () +destroyNodeResources (Connection nodeConnMap _ _ _ _ ) = mapM_ disconnectNode (HM.elems nodeConnMap) where + disconnectNode (NodeConnection nodePool _ _) = destroyAllResources nodePool -- Add a request to the current pipeline for this connection. The pipeline will -- be executed implicitly as soon as any result returned from this function is @@ -432,34 +441,33 @@ allMasterNodes (Connection nodeConns _ _ _ _) (ShardMap shardMap) = onlyMasterNodes = (\(Shard master _) -> master) <$> nub (IntMap.elems shardMap) requestNode :: NodeConnection -> [[B.ByteString]] -> IO [Reply] -requestNode (NodeConnection ctx lastRecvRef _) requests = do +requestNode (NodeConnection pool lastRecvRef nid) requests = do envTimeout <- round . (\x -> (x :: Time.NominalDiffTime) * 1000000) . realToFrac . fromMaybe (0.5 :: Double) . (>>= readMaybe) <$> lookupEnv "REDIS_REQUEST_NODE_TIMEOUT" eresp <- race requestNodeImpl (threadDelay envTimeout) case eresp of - Left e -> return e - Right _ -> putStrLn "timeout happened" *> throwIO NoNodeException - + Left e -> return e + Right _ -> putStrLn ("timeout happened" ++ show nid) *> throwIO NoNodeException where requestNodeImpl :: IO [Reply] requestNodeImpl = do mapM_ (sendNode . renderRequest) requests - _ <- CC.flush ctx + _ <- withResource pool CC.flush replicateM (length requests) recvNode sendNode :: B.ByteString -> IO () - sendNode = CC.send ctx + sendNode reqs = withResource pool (`CC.send` reqs) recvNode :: IO Reply recvNode = do maybeLastRecv <- IOR.readIORef lastRecvRef scanResult <- case maybeLastRecv of - Just lastRecv -> Scanner.scanWith (CC.recv ctx) reply lastRecv - Nothing -> Scanner.scanWith (CC.recv ctx) reply B.empty + Just lastRecv -> Scanner.scanWith (withResource pool CC.recv) reply lastRecv + Nothing -> Scanner.scanWith (withResource pool CC.recv) reply B.empty case scanResult of - Scanner.Fail{} -> CC.errConnClosed - Scanner.More{} -> error "Hedis: parseWith returned Partial" - Scanner.Done rest' r -> do - IOR.writeIORef lastRecvRef (Just rest') - return r + Scanner.Fail{} -> CC.errConnClosed + Scanner.More{} -> error "Hedis: parseWith returned Partial" + Scanner.Done rest' r -> do + IOR.writeIORef lastRecvRef (Just rest') + return r {-# INLINE nodes #-} nodes :: ShardMap -> [Node] diff --git a/src/Database/Redis/Connection.hs b/src/Database/Redis/Connection.hs index bce72674..ab6b9803 100644 --- a/src/Database/Redis/Connection.hs +++ b/src/Database/Redis/Connection.hs @@ -1,6 +1,7 @@ {-# LANGUAGE TupleSections #-} {-# LANGUAGE RecordWildCards #-} {-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE ScopedTypeVariables #-} module Database.Redis.Connection where import Control.Exception @@ -25,6 +26,7 @@ import Database.Redis.Protocol(Reply(..)) import Database.Redis.Cluster(ShardMap(..), Node, Shard(..)) import qualified Database.Redis.Cluster as Cluster import qualified Database.Redis.ConnectionContext as CC +import qualified Database.Redis.Types as T --import qualified Database.Redis.Cluster.Pipeline as ClusterPipeline import Database.Redis.Commands ( ping @@ -45,7 +47,7 @@ import Database.Redis.Commands -- 'connect' function to create one. data Connection = NonClusteredConnection (Pool PP.Connection) - | ClusteredConnection (MVar ShardMap) (Pool Cluster.Connection) + | ClusteredConnection (MVar ShardMap) Cluster.Connection -- |Information for connnecting to a Redis server. -- @@ -164,7 +166,7 @@ checkedConnect connInfo = do -- |Destroy all idle resources in the pool. disconnect :: Connection -> IO () disconnect (NonClusteredConnection pool) = destroyAllResources pool -disconnect (ClusteredConnection _ pool) = destroyAllResources pool +disconnect (ClusteredConnection _ conn) = Cluster.destroyNodeResources conn -- | Memory bracket around 'connect' and 'disconnect'. withConnect :: (Catch.MonadMask m, MonadIO m) => ConnectInfo -> (Connection -> m c) -> m c @@ -182,8 +184,8 @@ withCheckedConnect connInfo = bracket (checkedConnect connInfo) disconnect runRedis :: Connection -> Redis a -> IO a runRedis (NonClusteredConnection pool) redis = withResource pool $ \conn -> runRedisInternal conn redis -runRedis (ClusteredConnection _ pool) redis = - withResource pool $ \conn -> runRedisClusteredInternal conn (refreshShardMap conn) redis +runRedis (ClusteredConnection _ conn) redis = + runRedisClusteredInternal conn (refreshShardMap conn) redis newtype ClusterConnectError = ClusterConnectError Reply deriving (Eq, Show, Typeable) @@ -215,9 +217,10 @@ connectCluster bootstrapConnInfo = do Right infos -> do let isConnectionReadOnly = connectReadOnly bootstrapConnInfo - clusterConnection = Cluster.connect withAuth infos shardMapVar timeoutOptUs isConnectionReadOnly refreshShardMapWithNodeConn - pool <- createPool (clusterConnect isConnectionReadOnly clusterConnection) Cluster.disconnect 1 (connectMaxIdleTime bootstrapConnInfo) (connectMaxConnections bootstrapConnInfo) - return $ ClusteredConnection shardMapVar pool + clusterConnection = Cluster.connect withAuth infos shardMapVar timeoutOptUs isConnectionReadOnly refreshShardMapWithNodeConn (connectMaxIdleTime bootstrapConnInfo) (connectMaxConnections bootstrapConnInfo) + -- pool <- createPool (clusterConnect isConnectionReadOnly clusterConnection) Cluster.disconnect 3 (connectMaxIdleTime bootstrapConnInfo) (connectMaxConnections bootstrapConnInfo) + connection <- clusterConnect isConnectionReadOnly clusterConnection + return $ ClusteredConnection shardMapVar connection where withAuth host port timeout = do conn <- PP.connect host port timeout @@ -240,13 +243,24 @@ connectCluster bootstrapConnInfo = do clusterConnect :: Bool -> IO Cluster.Connection -> IO Cluster.Connection clusterConnect readOnlyConnection connection = do clusterConn@(Cluster.Connection nodeMap _ _ _ _) <- connection - nodesConns <- sequence $ ( PP.fromCtx . (\(Cluster.NodeConnection ctx _ _) -> ctx ) . snd) <$> (HM.toList nodeMap) + nodesConns <- sequence $ (ctxToConn . snd) <$> (HM.toList nodeMap) when readOnlyConnection $ - mapM_ (\conn -> do - PP.beginReceiving conn - runRedisInternal conn readOnly + mapM_ (\maybeConn -> case maybeConn of + Just conn -> do + PP.beginReceiving conn + runRedisInternal conn readOnly + Nothing -> return $ Right (T.Status "Connection does not exist") ) nodesConns return clusterConn + where + ctxToConn :: Cluster.NodeConnection -> IO (Maybe PP.Connection) + ctxToConn (Cluster.NodeConnection pool _ nid) = do + maybeConn <- try $ withResource pool PP.fromCtx + case maybeConn of + Right ppConn -> return $ Just ppConn + Left (_ :: SomeException) -> do + putStrLn ("SomeException Occured in NodeID " ++ show nid) + return Nothing shardMapFromClusterSlotsResponse :: ClusterSlotsResponse -> IO ShardMap shardMapFromClusterSlotsResponse ClusterSlotsResponse{..} = ShardMap <$> foldr mkShardMap (pure IntMap.empty) clusterSlotsResponseEntries where @@ -270,8 +284,8 @@ refreshShardMap (Cluster.Connection nodeConns _ _ _ _) = refreshShardMapWithNodeConn (head $ HM.elems nodeConns) refreshShardMapWithNodeConn :: Cluster.NodeConnection -> IO ShardMap -refreshShardMapWithNodeConn (Cluster.NodeConnection ctx _ _) = do - pipelineConn <- PP.fromCtx ctx +refreshShardMapWithNodeConn (Cluster.NodeConnection pool _ _) = do + pipelineConn <- withResource pool PP.fromCtx refreshShardMapWithConn pipelineConn True refreshShardMapWithConn :: PP.Connection -> Bool -> IO ShardMap