forked from informatikr/hedis
-
Notifications
You must be signed in to change notification settings - Fork 30
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
One pool per node #19
Draft
shashitnak
wants to merge
6
commits into
juspay:master
Choose a base branch
from
shashitnak:one-pool-per-node
base: master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Draft
Changes from 2 commits
Commits
Show all changes
6 commits
Select commit
Hold shift + click to select a range
039a83d
changing NodeConnection to Pool of NodeConnection and taking Cluster …
shashitnak bc79b73
moving withResource call to the beginning of requestNode
shashitnak 8545c0e
moving NodeConnection HashMap in MVar and adding refreshCluster function
shashitnak 0ee0bf0
refreshShardMap is now updating both MVar's and removing refreshCluster
shashitnak 47613cf
dropping a NodeConnection from HashMap if it throws an exception in c…
shashitnak 4362e64
only adding new nodes to hashmap and not changing the old ones
shashitnak File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,7 +2,6 @@ | |
{-# LANGUAGE RecordWildCards #-} | ||
{-# LANGUAGE OverloadedStrings #-} | ||
{-# LANGUAGE ScopedTypeVariables #-} | ||
|
||
module Database.Redis.Connection where | ||
|
||
import Control.Exception | ||
|
@@ -33,6 +32,7 @@ import qualified Database.Redis.Cluster as Cluster | |
import qualified Database.Redis.ConnectionContext as CC | ||
import Control.Concurrent (threadDelay) | ||
import Control.Concurrent.Async (race) | ||
import qualified Database.Redis.Types as T | ||
--import qualified Database.Redis.Cluster.Pipeline as ClusterPipeline | ||
|
||
import Database.Redis.Commands | ||
|
@@ -54,7 +54,7 @@ import Database.Redis.Commands | |
-- 'connect' function to create one. | ||
data Connection | ||
= NonClusteredConnection (Pool PP.Connection) | ||
| ClusteredConnection (MVar ShardMap) (Pool Cluster.Connection) | ||
| ClusteredConnection (MVar ShardMap) Cluster.Connection | ||
|
||
-- |Information for connnecting to a Redis server. | ||
-- | ||
|
@@ -173,7 +173,7 @@ checkedConnect connInfo = do | |
-- |Destroy all idle resources in the pool. | ||
disconnect :: Connection -> IO () | ||
disconnect (NonClusteredConnection pool) = destroyAllResources pool | ||
disconnect (ClusteredConnection _ pool) = destroyAllResources pool | ||
disconnect (ClusteredConnection _ conn) = Cluster.destroyNodeResources conn | ||
|
||
-- | Memory bracket around 'connect' and 'disconnect'. | ||
withConnect :: (Catch.MonadMask m, MonadIO m) => ConnectInfo -> (Connection -> m c) -> m c | ||
|
@@ -191,8 +191,8 @@ withCheckedConnect connInfo = bracket (checkedConnect connInfo) disconnect | |
runRedis :: Connection -> Redis a -> IO a | ||
runRedis (NonClusteredConnection pool) redis = | ||
withResource pool $ \conn -> runRedisInternal conn redis | ||
runRedis (ClusteredConnection _ pool) redis = | ||
withResource pool $ \conn -> runRedisClusteredInternal conn (refreshShardMap conn) redis | ||
runRedis (ClusteredConnection _ conn) redis = | ||
runRedisClusteredInternal conn (refreshShardMap conn) redis | ||
|
||
newtype ClusterConnectError = ClusterConnectError Reply | ||
deriving (Eq, Show, Typeable) | ||
|
@@ -224,9 +224,10 @@ connectCluster bootstrapConnInfo = do | |
Right infos -> do | ||
let | ||
isConnectionReadOnly = connectReadOnly bootstrapConnInfo | ||
clusterConnection = Cluster.connect withAuth infos shardMapVar timeoutOptUs isConnectionReadOnly refreshShardMapWithNodeConn | ||
pool <- createPool (clusterConnect isConnectionReadOnly clusterConnection) Cluster.disconnect 1 (connectMaxIdleTime bootstrapConnInfo) (connectMaxConnections bootstrapConnInfo) | ||
return $ ClusteredConnection shardMapVar pool | ||
clusterConnection = Cluster.connect withAuth infos shardMapVar timeoutOptUs isConnectionReadOnly refreshShardMapWithNodeConn (connectMaxIdleTime bootstrapConnInfo) (connectMaxConnections bootstrapConnInfo) | ||
-- pool <- createPool (clusterConnect isConnectionReadOnly clusterConnection) Cluster.disconnect 3 (connectMaxIdleTime bootstrapConnInfo) (connectMaxConnections bootstrapConnInfo) | ||
connection <- clusterConnect isConnectionReadOnly clusterConnection | ||
return $ ClusteredConnection shardMapVar connection | ||
where | ||
withAuth host port timeout = do | ||
conn <- PP.connect host port timeout | ||
|
@@ -249,13 +250,24 @@ connectCluster bootstrapConnInfo = do | |
clusterConnect :: Bool -> IO Cluster.Connection -> IO Cluster.Connection | ||
clusterConnect readOnlyConnection connection = do | ||
clusterConn@(Cluster.Connection nodeMap _ _ _ _) <- connection | ||
nodesConns <- sequence $ ( PP.fromCtx . (\(Cluster.NodeConnection ctx _ _) -> ctx ) . snd) <$> (HM.toList nodeMap) | ||
nodesConns <- sequence $ (ctxToConn . snd) <$> (HM.toList nodeMap) | ||
when readOnlyConnection $ | ||
mapM_ (\conn -> do | ||
PP.beginReceiving conn | ||
runRedisInternal conn readOnly | ||
mapM_ (\maybeConn -> case maybeConn of | ||
Just conn -> do | ||
PP.beginReceiving conn | ||
runRedisInternal conn readOnly | ||
Nothing -> return $ Right (T.Status "Connection does not exist") | ||
) nodesConns | ||
return clusterConn | ||
where | ||
ctxToConn :: Cluster.NodeConnection -> IO (Maybe PP.Connection) | ||
ctxToConn (Cluster.NodeConnection pool _ nid) = do | ||
maybeConn <- try $ withResource pool PP.fromCtx | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we have to remove the connection from pool if there is an exception? rather than catching with |
||
case maybeConn of | ||
Right ppConn -> return $ Just ppConn | ||
Left (_ :: SomeException) -> do | ||
putStrLn ("SomeException Occured in NodeID " ++ show nid) | ||
return Nothing | ||
|
||
shardMapFromClusterSlotsResponse :: ClusterSlotsResponse -> IO ShardMap | ||
shardMapFromClusterSlotsResponse ClusterSlotsResponse{..} = ShardMap <$> foldr mkShardMap (pure IntMap.empty) clusterSlotsResponseEntries where | ||
|
@@ -282,20 +294,21 @@ refreshShardMapWithNodeConn :: [Cluster.NodeConnection] -> IO ShardMap | |
refreshShardMapWithNodeConn [] = throwIO $ ClusterConnectError (Error "Couldn't refresh shardMap due to connection error") | ||
refreshShardMapWithNodeConn nodeConnsList = do | ||
selectedIdx <- randomRIO (0, (length nodeConnsList) - 1) | ||
let (Cluster.NodeConnection ctx _ _) = nodeConnsList !! selectedIdx | ||
pipelineConn <- PP.fromCtx ctx | ||
envTimeout <- fromMaybe (10 ^ (3 :: Int)) . (>>= readMaybe) <$> lookupEnv "REDIS_CLUSTER_SLOTS_TIMEOUT" | ||
raceResult <- race (threadDelay envTimeout) (try $ refreshShardMapWithConn pipelineConn True) -- racing with delay of default 1 ms | ||
case raceResult of | ||
Left () -> do | ||
print $ "TimeoutForConnection " <> show ctx | ||
throwIO $ Cluster.TimeoutException "ClusterSlots Timeout" | ||
Right eiShardMapResp -> | ||
case eiShardMapResp of | ||
Right shardMap -> pure shardMap | ||
Left (err :: SomeException) -> do | ||
print $ "ShardMapRefreshError-" <> show err | ||
throwIO $ ClusterConnectError (Error "Couldn't refresh shardMap due to connection error") | ||
let (Cluster.NodeConnection pool _ _) = nodeConnsList !! selectedIdx | ||
withResource pool $ \ctx -> do | ||
pipelineConn <- PP.fromCtx ctx | ||
envTimeout <- fromMaybe (10 ^ (3 :: Int)) . (>>= readMaybe) <$> lookupEnv "REDIS_CLUSTER_SLOTS_TIMEOUT" | ||
raceResult <- race (threadDelay envTimeout) (try $ refreshShardMapWithConn pipelineConn True) -- racing with delay of default 1 ms | ||
case raceResult of | ||
Left () -> do | ||
print $ "TimeoutForConnection " <> show ctx | ||
throwIO $ Cluster.TimeoutException "ClusterSlots Timeout" | ||
Right eiShardMapResp -> | ||
case eiShardMapResp of | ||
Right shardMap -> pure shardMap | ||
Left (err :: SomeException) -> do | ||
print $ "ShardMapRefreshError-" <> show err | ||
throwIO $ ClusterConnectError (Error "Couldn't refresh shardMap due to connection error") | ||
|
||
refreshShardMapWithConn :: PP.Connection -> Bool -> IO ShardMap | ||
refreshShardMapWithConn pipelineConn _ = do | ||
|
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since a pool is now per node, the map "HM.HashMap NodeID NodeConnection" is probably never going to update.
Can we find a way to update this mapping also? This will fail requests when we scale up Redis cluster where we won't be able to find the new NodeID in this map.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ishan-juspay is it okay if we trigger the refresh logic at the point where some NodeId is not found in the map? We can have a Pool of a pair of HashMap and ShardMap and whenever some NodeId is not found in the HashMap, it will throw and Exception and a new constructor will be called for the Pair.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And this way, if some Resources are working with NodeId that they are able to find, then they will keep working without any problems.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can have this HashMap as an MVar and update it whenever RefreshShardMap is called ? @aravindgopall