Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions app-e2e/src/Test/E2E/Endpoint/Startup.purs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ spec = do

Spec.describe "scheduleTransfers" do
Spec.it "enqueues transfer jobs when package location changes" do
-- type-equality metadata says old-owner, but tags point to purescript
-- type-equality metadata says purescript, but tags point to new-owner
jobs <- Client.getJobs
let
isTypeEqualityTransferJob :: Job -> Boolean
Expand All @@ -101,8 +101,8 @@ spec = do
Transfer { newLocation } ->
case newLocation of
GitHub { owner } ->
when (owner /= "purescript") do
Assert.fail $ "Expected owner 'purescript' but got '" <> owner <> "'"
when (owner /= "new-owner") do
Assert.fail $ "Expected owner 'new-owner' but got '" <> owner <> "'"
_ -> Assert.fail "Expected GitHub location"
_ -> Assert.fail "Expected Transfer payload"
Just _ -> Assert.fail "Expected TransferJob but got different job type"
Expand Down
2 changes: 1 addition & 1 deletion app/fixtures/registry/metadata/type-equality.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"location": {
"githubOwner": "old-owner",
"githubOwner": "purescript",
"githubRepo": "purescript-type-equality"
},
"published": {
Expand Down
120 changes: 86 additions & 34 deletions app/src/App/Effect/Registry.purs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ module Registry.App.Effect.Registry where
import Registry.App.Prelude

import Codec.JSON.DecodeError as CJ.DecodeError
import Control.Parallel as Parallel
import Data.Array as Array
import Data.Array.NonEmpty as NonEmptyArray
import Data.Codec.JSON as CJ
Expand All @@ -17,26 +18,27 @@ import Data.Map as Map
import Data.Set as Set
import Data.String as String
import Data.Time.Duration as Duration
import Effect.Aff (Milliseconds(..))
import Effect.Aff as Aff
import Effect.Aff.AVar (AVar)
import Effect.Aff.AVar as AVar
import Effect.Ref as Ref
import JSON as JSON
import Node.FS.Aff as FS.Aff
import Node.Path as Path
import Registry.Foreign.FSExtra as FS.Extra
import Registry.App.CLI.Git (GitResult)
import Registry.App.CLI.Git as Git
import Registry.App.Effect.Cache (class MemoryEncodable, Cache, CacheRef, MemoryEncoding(..))
import Registry.App.Effect.Cache as Cache
import Registry.App.Effect.GitHub (GITHUB)
import Registry.App.Effect.GitHub as GitHub
import Registry.App.Effect.Log (LOG)
import Registry.App.Effect.Log (LOG, Log)
import Registry.App.Effect.Log as Log
import Registry.App.Legacy.PackageSet (PscTag(..))
import Registry.App.Legacy.PackageSet as Legacy.PackageSet
import Registry.App.Legacy.Types (legacyPackageSetCodec)
import Registry.Constants as Constants
import Registry.Foreign.FSExtra as FS.Extra
import Registry.Foreign.FastGlob as FastGlob
import Registry.Foreign.Octokit (Address)
import Registry.Foreign.Octokit as Octokit
Expand All @@ -54,6 +56,7 @@ import Run as Run
import Run.Except (EXCEPT)
import Run.Except as Except
import Safe.Coerce (coerce)
import Type.Proxy (Proxy(..))

data RegistryCache (c :: Type -> Type -> Type) a
= AllManifests (c ManifestIndex a)
Expand Down Expand Up @@ -175,16 +178,17 @@ data Process

derive instance Eq Process

instance Show Process where
show Scheduler = "Scheduler"
show JobExecutor = "JobExecutor"
show API = "API"
show ScriptLegacyImporter = "ScriptLegacyImporter"
show ScriptPackageDeleter = "ScriptPackageDeleter"
show ScriptSolver = "ScriptSolver"
show ScriptVerifyIntegrity = "ScriptVerifyIntegrity"
show ScriptCompilerVersions = "ScriptCompilerVersions"
show ScriptArchiveSeeder = "ScriptArchiveSeeder"
printProcess :: Process -> String
printProcess = case _ of
Scheduler -> "Scheduler"
JobExecutor -> "JobExecutor"
API -> "API"
ScriptLegacyImporter -> "ScriptLegacyImporter"
ScriptPackageDeleter -> "ScriptPackageDeleter"
ScriptSolver -> "ScriptSolver"
ScriptVerifyIntegrity -> "ScriptVerifyIntegrity"
ScriptCompilerVersions -> "ScriptCompilerVersions"
ScriptArchiveSeeder -> "ScriptArchiveSeeder"

-- | A lock for a single repository, tracking both the mutex and the owner.
type RepoLock = { lock :: AVar Unit, owner :: Ref (Maybe Process) }
Expand All @@ -211,34 +215,82 @@ getOrCreateLock locksRef key = do

-- | Acquire a repository lock, run an action, and release the lock.
-- | The lock prevents concurrent access to the same repository.
-- | Defaults to a 60-second timeout.
withRepoLock
:: forall r a
. Process
-> RepoLocks
-> RepoKey
-> Run (LOG + AFF + EFFECT + r) a
-> Run (LOG + AFF + EFFECT + r) a
withRepoLock process locks key action = do
-> Run (LOG + AFF + EFFECT + EXCEPT String + ()) a
-> Run (LOG + AFF + EFFECT + EXCEPT String + r) a
withRepoLock = withRepoLockTimeout (Milliseconds 60_000.0)

-- | Acquire a repository lock, run an action, and release the lock.
-- | The lock prevents concurrent access to the same repository.
withRepoLockTimeout
:: forall r a
. Milliseconds
-> Process
-> RepoLocks
-> RepoKey
-> Run (LOG + AFF + EFFECT + EXCEPT String + ()) a
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a closed row restricted just to the actions we intend to 'lower' to Aff. If we wanted to expand locking outside this module we'd have to add explicit handling for anything else we want to run to Aff.

-> Run (LOG + AFF + EFFECT + EXCEPT String + r) a
withRepoLockTimeout timeout process locks key action = do
repoLock <- Run.liftAff $ getOrCreateLock locks key
Run.liftAff $ AVar.take repoLock.lock
Run.liftEffect $ Ref.write (Just process) repoLock.owner
result <- action
Run.liftEffect $ Ref.write Nothing repoLock.owner
Run.liftAff $ AVar.put unit repoLock.lock
pure result

-- | Clear any locks owned by a specific process.
-- | Used to clean up orphaned locks when a process crashes and restarts.
clearOwnLocks :: forall r. Process -> RepoLocks -> Run (LOG + AFF + EFFECT + r) Unit
clearOwnLocks process locksRef = do
locks <- Run.liftEffect $ Ref.read locksRef
for_ (Map.toUnfoldable locks :: Array _) \(Tuple _ repoLock) -> do
owner <- Run.liftEffect $ Ref.read repoLock.owner
when (owner == Just process) do
Log.warn $ "Clearing orphaned lock for " <> show process
Run.liftEffect $ Ref.write Nothing repoLock.owner
-- Put the unit back to release the lock
Run.liftAff $ AVar.put unit repoLock.lock

-- It isn't possible to run exception-safe Aff code like `bracket` within
-- the extensible effects system. For the actions we need to support
-- behind a lock we only need to support the LOG effect, so we lower to
-- Aff, run the lock-guarded code safely, and aggregate the logs to be
-- flushed afterwards.
{ logs, outcome } <- Run.liftAff do
logsRef <- liftEffect $ Ref.new []
outcome <- withRepoLockAff repoLock timeout (runWithLogs logsRef action)
logs <- liftEffect $ Ref.read logsRef
pure { logs, outcome }
Comment on lines +246 to +250
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc: @natefaubion I couldn't think of a better way to do this, but if you have ideas I'm all ears


-- We replay the collected logs
for_ logs \log ->
Run.lift Log._log log

case outcome of
Nothing -> do
Log.warn $ "Repo lock timed out for " <> printProcess process
Except.throw "Repo lock timed out."
Just (Left err) ->
Except.throw $ "Repo action failed: " <> Aff.message err
Just (Right value) ->
pure value
where
runWithLogs :: Ref (Array (Log Unit)) -> Run (LOG + AFF + EFFECT + EXCEPT String ()) a -> Aff (Either Aff.Error a)
runWithLogs ref = Aff.attempt <<< Run.runCont step pure
where
step =
Run.case_
# Run.on Log._log handleLog
# Run.on (Proxy @"aff") (\k -> k >>= identity)
# Run.on (Proxy @"effect") (\k -> liftEffect k >>= identity)
# Run.on Except._except (\k -> Aff.throwError (Aff.error (coerce k)))

handleLog (Log.Log level message next) = do
liftEffect $ Ref.modify_ (\logs -> Array.snoc logs (Log.Log level message unit)) ref
next

-- | Acquire a lock, run the action, and release the lock, guarded by a bracket to clean the
-- | locks on exception. Action is cancelled after a configurable timeout
withRepoLockAff :: RepoLock -> Milliseconds -> Aff (Either Aff.Error a) -> Aff (Maybe (Either Aff.Error a))
withRepoLockAff repoLock lockTimeout aff =
Aff.bracket acquire release \_ -> do
let race = Parallel.parallel (Just <$> aff) <|> Parallel.parallel (Aff.delay lockTimeout $> Nothing)
Parallel.sequential race
where
acquire = do
AVar.take repoLock.lock
liftEffect $ Ref.write (Just process) repoLock.owner

release _ = do
liftEffect $ Ref.write Nothing repoLock.owner
AVar.put unit repoLock.lock

-- | Validate that a repository is in a valid state.
-- | If the repo is corrupted (e.g., from an interrupted clone), delete it.
Expand Down
14 changes: 8 additions & 6 deletions app/src/App/Server/Scheduler.purs
Original file line number Diff line number Diff line change
Expand Up @@ -257,12 +257,14 @@ enqueuePublishJob allMetadata name (Metadata metadata) version ref = do
-- Find the highest published version of each dependency within its range
let
depVersions :: Map PackageName Version
depVersions = Map.mapMaybeWithKey (\depName range ->
case Map.lookup depName allMetadata of
Just (Metadata depMeta) ->
Array.last $ Array.filter (Range.includes range) $ Array.sort $ Array.fromFoldable $ Map.keys depMeta.published
Nothing -> Nothing
) manifest.dependencies
depVersions = Map.mapMaybeWithKey
( \depName range ->
case Map.lookup depName allMetadata of
Just (Metadata depMeta) ->
Array.last $ Array.filter (Range.includes range) $ Array.sort $ Array.fromFoldable $ Map.keys depMeta.published
Nothing -> Nothing
)
manifest.dependencies

case compatibleCompilers allMetadata depVersions of
Just compilerSet -> pure $ NonEmptySet.min compilerSet
Expand Down
7 changes: 5 additions & 2 deletions app/test/App/API.purs
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,9 @@ spec = do
Nothing -> Except.throw $ "Expected " <> formatPackageVersion name version <> " to be in metadata."
Just published -> do
let many' = NonEmptyArray.toArray published.compilers
let expected = map Utils.unsafeVersion [ "0.15.10", "0.15.11" ]
-- Only 0.15.10 is expected because prelude only has 0.15.10 in metadata,
-- so the solver cannot find a solution for 0.15.11
let expected = map Utils.unsafeVersion [ "0.15.10" ]
unless (many' == expected) do
Except.throw $ "Expected " <> formatPackageVersion name version <> " to have a compiler matrix of " <> Utils.unsafeStringify (map Version.print expected) <> " but got " <> Utils.unsafeStringify (map Version.print many')

Expand Down Expand Up @@ -191,7 +193,8 @@ spec = do
Nothing -> Except.throw $ "Expected " <> formatPackageVersion transitive.name transitive.version <> " to be in metadata."
Just published -> do
let many' = NonEmptyArray.toArray published.compilers
let expected = map Utils.unsafeVersion [ "0.15.10", "0.15.11" ]
-- Only 0.15.10 is expected because prelude only has 0.15.10 in metadata
let expected = map Utils.unsafeVersion [ "0.15.10" ]
unless (many' == expected) do
Except.throw $ "Expected " <> formatPackageVersion transitive.name transitive.version <> " to have a compiler matrix of " <> Utils.unsafeStringify (map Version.print expected) <> " but got " <> Utils.unsafeStringify (map Version.print many')

Expand Down
Loading