-
Notifications
You must be signed in to change notification settings - Fork 80
Make repo locks exception safe #718
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -6,6 +6,7 @@ module Registry.App.Effect.Registry where | |
| import Registry.App.Prelude | ||
|
|
||
| import Codec.JSON.DecodeError as CJ.DecodeError | ||
| import Control.Parallel as Parallel | ||
| import Data.Array as Array | ||
| import Data.Array.NonEmpty as NonEmptyArray | ||
| import Data.Codec.JSON as CJ | ||
|
|
@@ -17,26 +18,27 @@ import Data.Map as Map | |
| import Data.Set as Set | ||
| import Data.String as String | ||
| import Data.Time.Duration as Duration | ||
| import Effect.Aff (Milliseconds(..)) | ||
| import Effect.Aff as Aff | ||
| import Effect.Aff.AVar (AVar) | ||
| import Effect.Aff.AVar as AVar | ||
| import Effect.Ref as Ref | ||
| import JSON as JSON | ||
| import Node.FS.Aff as FS.Aff | ||
| import Node.Path as Path | ||
| import Registry.Foreign.FSExtra as FS.Extra | ||
| import Registry.App.CLI.Git (GitResult) | ||
| import Registry.App.CLI.Git as Git | ||
| import Registry.App.Effect.Cache (class MemoryEncodable, Cache, CacheRef, MemoryEncoding(..)) | ||
| import Registry.App.Effect.Cache as Cache | ||
| import Registry.App.Effect.GitHub (GITHUB) | ||
| import Registry.App.Effect.GitHub as GitHub | ||
| import Registry.App.Effect.Log (LOG) | ||
| import Registry.App.Effect.Log (LOG, Log) | ||
| import Registry.App.Effect.Log as Log | ||
| import Registry.App.Legacy.PackageSet (PscTag(..)) | ||
| import Registry.App.Legacy.PackageSet as Legacy.PackageSet | ||
| import Registry.App.Legacy.Types (legacyPackageSetCodec) | ||
| import Registry.Constants as Constants | ||
| import Registry.Foreign.FSExtra as FS.Extra | ||
| import Registry.Foreign.FastGlob as FastGlob | ||
| import Registry.Foreign.Octokit (Address) | ||
| import Registry.Foreign.Octokit as Octokit | ||
|
|
@@ -54,6 +56,7 @@ import Run as Run | |
| import Run.Except (EXCEPT) | ||
| import Run.Except as Except | ||
| import Safe.Coerce (coerce) | ||
| import Type.Proxy (Proxy(..)) | ||
|
|
||
| data RegistryCache (c :: Type -> Type -> Type) a | ||
| = AllManifests (c ManifestIndex a) | ||
|
|
@@ -175,16 +178,17 @@ data Process | |
|
|
||
| derive instance Eq Process | ||
|
|
||
| instance Show Process where | ||
| show Scheduler = "Scheduler" | ||
| show JobExecutor = "JobExecutor" | ||
| show API = "API" | ||
| show ScriptLegacyImporter = "ScriptLegacyImporter" | ||
| show ScriptPackageDeleter = "ScriptPackageDeleter" | ||
| show ScriptSolver = "ScriptSolver" | ||
| show ScriptVerifyIntegrity = "ScriptVerifyIntegrity" | ||
| show ScriptCompilerVersions = "ScriptCompilerVersions" | ||
| show ScriptArchiveSeeder = "ScriptArchiveSeeder" | ||
| printProcess :: Process -> String | ||
| printProcess = case _ of | ||
| Scheduler -> "Scheduler" | ||
| JobExecutor -> "JobExecutor" | ||
| API -> "API" | ||
| ScriptLegacyImporter -> "ScriptLegacyImporter" | ||
| ScriptPackageDeleter -> "ScriptPackageDeleter" | ||
| ScriptSolver -> "ScriptSolver" | ||
| ScriptVerifyIntegrity -> "ScriptVerifyIntegrity" | ||
| ScriptCompilerVersions -> "ScriptCompilerVersions" | ||
| ScriptArchiveSeeder -> "ScriptArchiveSeeder" | ||
|
|
||
| -- | A lock for a single repository, tracking both the mutex and the owner. | ||
| type RepoLock = { lock :: AVar Unit, owner :: Ref (Maybe Process) } | ||
|
|
@@ -211,34 +215,82 @@ getOrCreateLock locksRef key = do | |
|
|
||
| -- | Acquire a repository lock, run an action, and release the lock. | ||
| -- | The lock prevents concurrent access to the same repository. | ||
| -- | Defaults to a 60-second timeout. | ||
| withRepoLock | ||
| :: forall r a | ||
| . Process | ||
| -> RepoLocks | ||
| -> RepoKey | ||
| -> Run (LOG + AFF + EFFECT + r) a | ||
| -> Run (LOG + AFF + EFFECT + r) a | ||
| withRepoLock process locks key action = do | ||
| -> Run (LOG + AFF + EFFECT + EXCEPT String + ()) a | ||
| -> Run (LOG + AFF + EFFECT + EXCEPT String + r) a | ||
| withRepoLock = withRepoLockTimeout (Milliseconds 60_000.0) | ||
|
|
||
| -- | Acquire a repository lock, run an action, and release the lock. | ||
| -- | The lock prevents concurrent access to the same repository. | ||
| withRepoLockTimeout | ||
| :: forall r a | ||
| . Milliseconds | ||
| -> Process | ||
| -> RepoLocks | ||
| -> RepoKey | ||
| -> Run (LOG + AFF + EFFECT + EXCEPT String + ()) a | ||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is a closed row restricted just to the actions we intend to 'lower' to |
||
| -> Run (LOG + AFF + EFFECT + EXCEPT String + r) a | ||
| withRepoLockTimeout timeout process locks key action = do | ||
| repoLock <- Run.liftAff $ getOrCreateLock locks key | ||
| Run.liftAff $ AVar.take repoLock.lock | ||
| Run.liftEffect $ Ref.write (Just process) repoLock.owner | ||
| result <- action | ||
| Run.liftEffect $ Ref.write Nothing repoLock.owner | ||
| Run.liftAff $ AVar.put unit repoLock.lock | ||
| pure result | ||
|
|
||
| -- | Clear any locks owned by a specific process. | ||
| -- | Used to clean up orphaned locks when a process crashes and restarts. | ||
| clearOwnLocks :: forall r. Process -> RepoLocks -> Run (LOG + AFF + EFFECT + r) Unit | ||
| clearOwnLocks process locksRef = do | ||
| locks <- Run.liftEffect $ Ref.read locksRef | ||
| for_ (Map.toUnfoldable locks :: Array _) \(Tuple _ repoLock) -> do | ||
| owner <- Run.liftEffect $ Ref.read repoLock.owner | ||
| when (owner == Just process) do | ||
| Log.warn $ "Clearing orphaned lock for " <> show process | ||
| Run.liftEffect $ Ref.write Nothing repoLock.owner | ||
| -- Put the unit back to release the lock | ||
| Run.liftAff $ AVar.put unit repoLock.lock | ||
|
|
||
| -- It isn't possible to run exception-safe Aff code like `bracket` within | ||
| -- the extensible effects system. For the actions we need to support | ||
| -- behind a lock we only need to support the LOG effect, so we lower to | ||
| -- Aff, run the lock-guarded code safely, and aggregate the logs to be | ||
| -- flushed afterwards. | ||
| { logs, outcome } <- Run.liftAff do | ||
| logsRef <- liftEffect $ Ref.new [] | ||
| outcome <- withRepoLockAff repoLock timeout (runWithLogs logsRef action) | ||
| logs <- liftEffect $ Ref.read logsRef | ||
| pure { logs, outcome } | ||
|
Comment on lines
+246
to
+250
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. cc: @natefaubion I couldn't think of a better way to do this, but if you have ideas I'm all ears |
||
|
|
||
| -- We replay the collected logs | ||
| for_ logs \log -> | ||
| Run.lift Log._log log | ||
|
|
||
| case outcome of | ||
| Nothing -> do | ||
| Log.warn $ "Repo lock timed out for " <> printProcess process | ||
| Except.throw "Repo lock timed out." | ||
| Just (Left err) -> | ||
| Except.throw $ "Repo action failed: " <> Aff.message err | ||
| Just (Right value) -> | ||
| pure value | ||
| where | ||
| runWithLogs :: Ref (Array (Log Unit)) -> Run (LOG + AFF + EFFECT + EXCEPT String ()) a -> Aff (Either Aff.Error a) | ||
| runWithLogs ref = Aff.attempt <<< Run.runCont step pure | ||
| where | ||
| step = | ||
| Run.case_ | ||
| # Run.on Log._log handleLog | ||
| # Run.on (Proxy @"aff") (\k -> k >>= identity) | ||
| # Run.on (Proxy @"effect") (\k -> liftEffect k >>= identity) | ||
| # Run.on Except._except (\k -> Aff.throwError (Aff.error (coerce k))) | ||
|
|
||
| handleLog (Log.Log level message next) = do | ||
| liftEffect $ Ref.modify_ (\logs -> Array.snoc logs (Log.Log level message unit)) ref | ||
| next | ||
|
|
||
| -- | Acquire a lock, run the action, and release the lock, guarded by a bracket to clean the | ||
| -- | locks on exception. Action is cancelled after a configurable timeout | ||
| withRepoLockAff :: RepoLock -> Milliseconds -> Aff (Either Aff.Error a) -> Aff (Maybe (Either Aff.Error a)) | ||
| withRepoLockAff repoLock lockTimeout aff = | ||
| Aff.bracket acquire release \_ -> do | ||
| let race = Parallel.parallel (Just <$> aff) <|> Parallel.parallel (Aff.delay lockTimeout $> Nothing) | ||
| Parallel.sequential race | ||
| where | ||
| acquire = do | ||
| AVar.take repoLock.lock | ||
| liftEffect $ Ref.write (Just process) repoLock.owner | ||
|
|
||
| release _ = do | ||
| liftEffect $ Ref.write Nothing repoLock.owner | ||
| AVar.put unit repoLock.lock | ||
|
|
||
| -- | Validate that a repository is in a valid state. | ||
| -- | If the repo is corrupted (e.g., from an interrupted clone), delete it. | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.