If you have ~sparcur.simple~ this is the simplest way to get a remote only connection to Pennsieve.
from sparcur.config import auth
from sparcur.simple.utils import backend_pennsieve
project_id = auth.get('remote-organization')
PennsieveRemote = backend_pennsieve(project_id)
root = PennsieveRemote(project_id)
datasets = list(root.children)
Otherwise you have to do what backend_pennsieve does for you
from sparcur.paths import PennsieveCache, Path
from sparcur.config import auth
from sparcur.backends import PennsieveRemote
project_id = auth.get('remote-organization')
PennsieveRemote = PennsieveRemote._new(Path, PennsieveCache)
PennsieveRemote.init(project_id)
root = PennsieveRemote(PennsieveRemote.root)
datasets = list(root.children)
You can run this example block and it will validate the DatasetTemplate.
Example usage
rsync -r path/to/some/curation/dataset test
pushd test/dataset
SCIGRAPH_API_KEY=$(python -c 'from pyontutils.config import auth; print(auth.get("scigraph-api-key"))') \
HOME=/tmp/test-home python -m sparcur.simple.validate
Example python usage
from sparcur.simple.validate import main as validate
from pathlib import Path
path = Path('../resources/DatasetTemplate')
blob = validate(path)
Implementation of sparcur.simple.validate
.
import augpathlib as aug
from sparcur import pipelines as pipes
from sparcur.paths import PathL, CacheL
from sparcur.utils import GetTimeNow
def makeValidator(dataset_path, time_now=None):
if time_now is None:
time_now = GetTimeNow()
class CacheX(CacheL):
def __new__(cls, *args, **kwargs):
return super().__new__(cls, *args, **kwargs)
CacheX._bind_flavours()
class PathX(PathL):
""" Workaround absense of cache. """
_cache_class = CacheX
def __new__(cls, *args, **kwargs):
return super().__new__(cls, *args, **kwargs)
# TODO likely will also need to rebind the cache class as well
#@property
#def dataset_relative_path(self, __drp=dataset_path):
#return self.relative_path_from(self.__class__(__drp))
CacheX._local_class = PathX
PathX._bind_flavours()
# XXX monkey patch TODO sigh FIXME DatasetStructure calls Path directly inside
#PathL.dataset_relative_path = Path.dataset_relative_path
# must caste before anything else is done so that anchor and
# datasets are known
dataset_path = PathX(dataset_path)
CacheX._dataset_dirs = [CacheX(dataset_path)]
# FIXME this is very much not ideal because we don't actually want
# the parent in this case
CacheX._asserted_anchor = CacheX(dataset_path.parent)
class context:
path = dataset_path.resolve()
id = path.id
uri_api = path.as_uri()
uri_human = path.as_uri()
class lifters:
id = context.id
remote = 'local'
folder_name = context.path.name
uri_api = context.uri_api
uri_human = context.uri_human
timestamp_export_start = time_now.START_TIMESTAMP
affiliations = lambda *args, **kwargs: None
techniques = tuple()
modality = None
organ_term = None
protocol_uris = tuple()
award_manual = None
return pipes.PipelineEnd(dataset_path, lifters, context)
return pipes.SDSPipeline(dataset_path, lifters, context) # shouldn't need network
def main(path=PathL.cwd(), time_now=None,
export_local=False, export_parent_path=None,
_entry_point=False, validate=False, **kwargs):
# ('../resources/DatasetTemplate')
pipeline = makeValidator(path, time_now=time_now)
data = pipeline.data
if _entry_point:
from sparcur.simple.export import export_blob
export_blob_path = export_blob(
data, 'curation-export.json', time_now=time_now,
export_parent_path=export_parent_path if export_parent_path is not None else path,
**kwargs)
return export_blob_path
else:
return data
if __name__ == '__main__':
from sparcur.simple.utils import pipe_main
pipe_main(main)
from sparcur.utils import path_ir
ir = path_ir('curation-export.json')
def main():
from sparcur.reports import Report
from sparcur.utils import path_ir
from pyontutils.core import OntResIri
ori = OntResIri('https://cassava.ucsd.edu/sparc/preview/exports/curation-export.ttl')
graph = ori.graph
ir = path_ir('/tmp/curation-export-test.json')
rows = Report._hubmap_terms(graph, ir)
anat = [r for r in rows if r[1].prefix in ('UBERON', 'FMA', 'ILX')]
all_r = expand_label_curie(rows)
ana_r = expand_label_curie(anat)
return all_r, ana_r
if __name__ == '__main__':
return main()
Create a protocols.io account and get API keys.
Then run the following to register with protocols.io. NOTE this is broken at the moment. Manual steps can be found in ./setup.org::#config-templates
python -c "import idlib; idlib.Pio._setup()"
You can then run the following to retrieve protocol data.
import idlib
from pyontutils.core import OntResIri
from pyontutils.namespaces import sparc, rdf
def getpio(i):
try:
return idlib.Pio(i)
except idlib.exc.IdlibError as e:
pass
def getdata(s):
try:
return s.data()
except (idlib.exc.NotAuthorizedError) as e:
print(e)
except (idlib.exc.IdDoesNotExistError) as e:
print(e)
def main():
ori = OntResIri("https://cassava.ucsd.edu/sparc/preview/exports/protcur.ttl")
g = ori.graph
pids = list(g[:rdf.type:sparc.Protocol])
streams = [s for i in pids for s in (getpio(i),) if s]
datas = [getdata(s) for s in streams]
return datas
if __name__ == '__main__':
main()
def new_xml_format(path):
from sparcur.extract import xml
ex = xml.XmlSource(path)
top_tag = ex.e.getroot().tag
The dependency DAG is as follows.
#+RESULTS[5b1ab6330a12cfe55439af47a6bd717498fc6c7d]: graph-retrieve-all
#+RESULTS[8febbedbaf66631abc1d1c9ed53915698665c236]:
function sparc-time-friendly () {
local UTC_OFFSET_START
local TIME_START_NO_OFFSET
# gnu coreutils gdate needed for osx support/freebsd
# gdate on darwin only has millisecond resolution?
# this also won't work on freebsd without gnu coreutils
iso8601millis="+%FT%T,%6N" # FIXME do we _really_ need millis!? yemaybe? concurrent startups?
utcoffset="+%:z"
# I really hope the utc offset doesn't change between start & end
# but laptops and airplains do exist, so it could
# also how utterly annoying that the date separator and the
# negative utc offset share the same symbol ... talk about
# an annoying design flaw that is going to haunt humanity
# with double the number of calls to date for # ... as
# long as anoyone is writing code to deal with time
TIME_START_NO_OFFSET=$(date ${iso8601millis} || gdate ${iso8601millis})
UTC_OFFSET_START=$(date ${utcoffset} || gdate ${utcoffset})
local TIME_START="${TIME_START_NO_OFFSET}${UTC_OFFSET_START}" # XXX unused
local TIME_START_NO_OFFSET_FS_OK=${TIME_START_NO_OFFSET//:/}
local UTC_OFFSET_START_FS_OK=${UTC_OFFSET_START//:/}
local TIME_START_FRIENDLY=${TIME_START_NO_OFFSET_FS_OK}${UTC_OFFSET_START_FS_OK}
# So. iso8601 guidance on what to do about subsecond time and the utc offset in the compact
# representation is not entirely clear, however I _think_ that %FT%T%H%M%S,%6N%z is ok but
# the -/+ must stay between the timezone and the rest, so we will have to grab tz by itself
local TIME_START_SAFE=${TIME_START_NO_OFFSET_FS_OK//-/}${UTC_OFFSET_START_FS_OK} # XXX unused
mv "$(mktemp --directory sparcur-all-XXXXXX)" "${TIME_START_FRIENDLY}" || \
{ CODE=$?; echo 'mv failed'; return $CODE; }
echo "${TIME_START_FRIENDLY}"
}
function sparc-get-all-remote-data () {
# NOTE not quite all the remote data, the google sheets
# don't have caching functionality yet
# parse args
local POSITIONAL=()
while [[ $# -gt 0 ]]
do
key="$1"
case $key in # (ref:(((((((sigh)
--project-id) local PROJECT_ID="${2}"; shift; shift ;;
--symlink-objects-to) local SYMLINK_OBJECTS_TO="${2}"; shift; shift ;;
--log-path) local LOG_PATH="${2}"; shift; shift ;;
--parent-path) local PARENT_PATH="${2}"; shift; shift ;;
--only-filesystem) local ONLY_FILESYSTEM="ONLY_FS"; shift ;;
-h|--help) echo "${HELP}"; return ;;
*) POSITIONAL+=("$1"); shift ;;
esac
done
# Why, you might be asking, are we declaring a local project path here without assignment?
# Well. Let me tell you. Because local is a command with an exist status. So it _always_
# returns zero. So if you need to check the output of the command running in a subshell
# that you are assigning to a local variable _ALWAYS_ set local separately first.
# Yes, shellcheck does warn about this. See also https://superuser.com/a/1103711
local PROJECT_PATH
if [[ -z "${PARENT_PATH}" ]]; then
local PARENT_PATH
set -o pipefail
PARENT_PATH=$(sparc-time-friendly) || {
CODE=$?;
echo "Creating "'${PARENT_PATH}'" failed!"
set +o pipefail
return $CODE;
}
set +o pipefail
fi
local LOG_PATH=${LOG_PATH:-"${PARENT_PATH}/logs"}
#local LOG_PATH=$(python -c "from sparcur.config import auth; print(auth.get_path('log-path'))")
local PROJECT_ID=${PROJECT_ID:-$(python -c "from sparcur.config import auth; print(auth.get('remote-organization'))")}
local maybe_slot=()
if [[ -n "${SYMLINK_OBJECTS_TO}" ]]; then
# MUST use arrays to capture optional arguments like this otherwise
# arg values with spaces in them will destroy your sanity
maybe_slot+=(--symlink-objects-to "${SYMLINK_OBJECTS_TO}")
fi
echo "${PARENT_PATH}" # needed to be able to follow logs
if [ ! -d "${LOG_PATH}" ]; then
mkdir "${LOG_PATH}" || { CODE=$?; echo 'mkdir of ${LOG_PATH} failed'; return $CODE; }
fi
if [[ -z "${ONLY_FILESYSTEM}" ]]; then
# fetch annotations (ref:bash-pipeline-fetch-annotations)
echo "Fetching annotations metadata"
python -m sparcur.simple.fetch_annotations > "${LOG_PATH}/fetch-annotations.log" 2>&1 &
local pids_final[0]=$!
# fetch remote metadata (ref:bash-pipeline-fetch-remote-metadata-all)
# if this fails with 503 errors, check the
# remote-backoff-factor config variable
echo "Fetching remote metadata"
python -m sparcur.simple.fetch_remote_metadata_all \
--project-id "${PROJECT_ID}" \
> "${LOG_PATH}/fetch-remote-metadata.log" 2>&1 &
local pids[0]=$!
fi
local FAIL=0
# clone aka fetch top level
# we do not background this assignment because it runs quickly
# and everything that follows depends on it finishing, plus we
# need it to finish to set the PROJECT_PATH variable here
echo python -m sparcur.simple.clone --project-id "${PROJECT_ID}" --parent-path "${PARENT_PATH}" "${maybe_slot[@]}"
echo "Cloning top level"
set -o pipefail
PROJECT_PATH=$(python -m sparcur.simple.clone \
--project-id "${PROJECT_ID}" \
--parent-path "${PARENT_PATH}" \
"${maybe_slot[@]}" \
2>&1 | tee "${LOG_PATH}/clone.log" | tail -n 1) || {
# TODO tee the output when verbose is passed
CODE=$?;
tail -n 100 "${LOG_PATH}/clone.log";
echo "Clone failed! The last 100 lines of ${LOG_PATH}/clone.log are listed above.";
apids=( "${pids[@]}" "${pids_final[@]}" );
for pid in "${apids[@]}"; do
kill $pid;
done;
set +o pipefail
return $CODE;
}
set +o pipefail
# explicit export of the current project path for pipelines
# ideally we wouldn't need this, and when this pipeline
# finished the export pipeline would kick off, or the export
# pipeline would search for ... an existing project path ...
# by ... oh right, looking for an environment variable or
# checksing some other persistent state ... so this is the one
# unless some controlling process sets it top down from the start
# but we can't assume that
export SPARCUR_PROJECT_PATH="${PROJECT_PATH}"
for pid in "${pids[@]}"; do
wait $pid || { FAIL=$((FAIL+1)); echo "${pid} failed!"; }
done
if [[ $FAIL -ne 0 || -z "${PROJECT_PATH}" ]]; then
echo "${FAIL} commands failed. Cannot continue."
echo "${PROJECT_PATH}"
return 1
fi
# pull aka fetch file system metadata
echo "Fetching file system metadata"
echo python -m sparcur.simple.pull --project-path "${PROJECT_PATH}"
python -m sparcur.simple.pull \
--project-path "${PROJECT_PATH}" \
> "${LOG_PATH}/pull.log" 2>&1 || {
CODE=$?;
tail -n 100 "${LOG_PATH}/pull.log";
echo "Pull failed! The last 100 lines of ${LOG_PATH}/pull.log are listed above.";
echo "${PROJECT_PATH}";
return $CODE; }
# fetch metadata files
echo "Fetching metadata files"
# have to pass project path as a position argument here so that it
# does not try to pull aka fetch the file system metadata again
echo python -m sparcur.simple.fetch_metadata_files --project-path "${PROJECT_PATH}"
python -m sparcur.simple.fetch_metadata_files \
--project-path "${PROJECT_PATH}" \
> "${LOG_PATH}/fetch-metadata-files.log" 2>&1 &
pids_final[1]=$!
# fetch files
echo "Fetching files"
# XXX at some point this will probably also depend on the manifests
# so we don't fetch everything with a matching extension
# TODO derive --extension from manifests or all it to be passed in
echo python -m sparcur.simple.fetch_metadata_files --project-path "${PROJECT_PATH}" --extension xml
# FIXME fetch_files fails silently here :/
python -m sparcur.simple.fetch_files \
--project-path "${PROJECT_PATH}" \
--extension xml \
> "${LOG_PATH}/fetch-files.log" 2>&1 &
pids_final[2]=$!
local FAIL=0
for pid in "${pids_final[@]}"; do
wait $pid || { FAIL=$((FAIL+1)); echo "${pid} failed!"; }
done
# FIXME HACK
#find -type f -size 0 -exec getfattr -d {} \;
#find -type f -size 0 -exec spc fetch --limit=-1 {} \;
if [[ $FAIL -ne 0 ]]; then
echo "${FAIL} commands failed. Cannot continue."
echo "${PROJECT_PATH}"
return 1
fi
echo "All fetching completed successfully."
}
This is the graph of the existing approach more or less as implemented
by spc export
.
A slightly more sane version is being implemented as part of
sparcur.simple
which will sandbox the network dependencies.
#+RESULTS[16bcd2566c9bc6aca9c4c547144fe50c5a542558]: graph-validate-all
#+RESULTS[20008f92af2cbbe5a5aa89221885829ea3bd0f11]: graph-validate-all-dot
In the current implementation validation and export are conflated. This is bad, and will be changed.
spc export
must only be run after sparc-get-all-remote-data
,
otherwise there will be network sandbox violations.
For the record there are multiple way invoke spc export
.
# pushd to the project location
pushd "${PROJECT_PATH:-SPARCUR_PROJECT_PATH}"
spc export
popd
# pass the project location as a positional argument
spc export "${PROJECT_PATH:-SPARCUR_PROJECT_PATH}"
# pass the project location as an option
spc export --project-path "${PROJECT_PATH:-SPARCUR_PROJECT_PATH}"
At the moment sparc-export-all
is just a wrapper around spc export
.
This will change as we move to a single dataset export model. There
will then likely be a function that checks for datasets that have
changed since last export, updates only those and then collects the
outputs.
function sparc-export-all () {
# parse args
local POSITIONAL=()
while [[ $# -gt 0 ]]
do
key="$1"
case $key in # (ref:(((sigh)
--project-path) local PROJECT_PATH="${2}"; shift; shift ;;
-h|--help) echo "${HELP}"; return ;;
*) POSITIONAL+=("$1"); shift ;;
esac
done
local PROJECT_PATH="${PROJECT_PATH:-$SPARCUR_PROJECT_PATH}"
spc export --project-path "${PROJECT_PATH}"
}
#+RESULTS[0a6edf0f4a2695740e58f78d9a31e1c88fb4ba3e]: graph-retrieve-single
#+RESULTS[e1bd20c4ba0201415ec2998cd18e8226bc9f801f]:
Desired invocation.
sparc-fexport ${UUID}
sparc-fexport dataset:${UUID}
sparc-fexport N:dataset:${UUID}
sparc-fexport https://api.pennsieve.io/datasets/N:dataset:${UUID}
sparc-fexport https://app.pennsieve.io/N:organization:618e8dd9-f8d2-4dc4-9abb-c6aaab2e78a0/datasets/N:dataset:${UUID}
Desired behavior.
dataset state | objects state | action | outcome |
---|---|---|---|
not-existing | do symlink to | retrieve | spc export; path-metadata |
existing | either don’t fail | retrieve or equiv | spc export; path-metadata |
Desired output.
Dataset to exports/datasets/${UUID}/${TIMESTAMP_FRIENDLY}/
and is
symlinked to exports/${UUID}/LATEST
. The last run dataset itself
goes to exports/datasets/LATEST
. We don’t have to care about the
project identifier because each UUID is unique.
TODO Logging goes ????
This is a quick and dirty version that should just do the right thing given only the dataset id as an input.
Usage sparc-fexport N:dataset:totally-not-a-uuid
. The id provided
may be any of the variants, url, curie, api, human, uuid, etc.
function sparc-export () {
echo TODO not ready yet
return 1
}
function sparc-fexport () {
local DATASET_ID="${1}"
local DATASET_UUID
local DATASET_PATH
local EXPORT_PATH
DATASET_UUID="$(python -m sparcur.simple.utils --dataset-id ${DATASET_ID})"
python -m sparcur.simple.retrieve --dataset-id ${DATASET_UUID} &&
EXPORT_PATH="$(realpath "${DATASET_UUID}/exports")" &&
DATASET_PATH="$(realpath "${DATASET_UUID}/dataset")" &&
pushd "${DATASET_PATH}" &&
# FIXME we shouldn't need this refetch so I think that retrieve is
# broken if files/folders already exist
python -m sparcur.cli find \
--name '*.xlsx' \
--name '*.xml' \
--name 'submission*' \
--name 'code_description*' \
--name 'dataset_description*' \
--name 'subjects*' \
--name 'samples*' \
--name 'manifest*' \
--name 'resources*' \
--name 'README*' \
--no-network \
--limit -1 \
--fetch
wait $!
python -m sparcur.cli export --export-path "${EXPORT_PATH}" & # FIXME TODO this conflates phases
local pids[0]=$!
# FIXME TODO for now export_single_dataset produces this so we don't run it independently
# FIXME there is also a difference in the export folder because the path metadata targets
# the last updated data and thus overwrites if the data has not changed but the code has
#python -m sparcur.simple.path_metadata_validate --export-path "${EXPORT_PATH}" &
#local pids[1]=$!
local FAIL=0
# TODO log/notify export failure
for pid in "${pids[@]}"; do
wait $pid || { FAIL=$((FAIL+1)); echo "${pid} failed!"; }
done
if [[ $FAIL -ne 0 ]]; then
echo "${FAIL} commands failed. Cannot continue."
echo "${DATASET_UUID}"
echo "${DATASET_PATH}"
return 1
fi
popd # or do it yourself because we might need to explore??
}
for d in $(ls *-* -d); do refetch ${d}; done
for d in $(ls *-* -d); do
find ~/".local/share/sparcur/export/datasets/${d}/LATEST/curation-export.json" -name 'curation-export.json';
done
See ref:graph-validate-all for all the related bits.
#+RESULTS[54c9e97e880b92a4c88f7e34a20be8a06198c76c]: graph-validate-single
#+RESULTS[e46fa5a6ebbfedba0f31b96f7f5a9a31a179b71f]:
python -m sparcur.simple.extract "${DATASET_PATH}"
Network access should only be possible during the retrieve phase.
The validate step may happen during extract and transform as well since structure or data content issues may be easier to detect during certain phases. Ideally this would not be the case, but practically it will take more work than necessary given our use cases.
We have to be careful to separate basic validation of the structure of the dataset data from the validation that the identifiers provided in that structure point to values known to their respective remotes.
For example we need to be able to say you are missing a protocol reference
at a separate point in time from saying the remote(s) we asked had no record
of the protocol reference you provided
.
This is pulled in bulk independently in a different workflow but it is probably worth checking to see if we need to run it again whenever we run a dataset.
This is ref:clone.py, ref:fetch_remote_metadata_all.py, and ref:pull.py.
This is now called ref:path_metadata, but it is also dealt with as part of specimen_dirs.
We can’t do this right now because the current dataset template cannot be statically validated. Only some of it can be validated when we have the data from the subjects and samples sheets. In this future pipeline the type prefixes will be required so that the structure can be statically verified.
Run this if the structure metadata is in a state where we can proceed (i.e. that there is a dataset_description file).
This is ref:fetch_metadata_files.py.
This is ref:fetch_files.py. Depends on the manifest files and the dataset structure.
This is local validation, not remote networked validation.
This is the retrieval/dereferencing of identifier metadata.
It must happen after the file metadata step has been completed so that e.g. identifiers used in MBF segmentation files can be validated. In this step in particular validation and retrieval are essentially the same step. If there is an error during retrieval then it must produce a validation error.
Checking and/or retrieving these depends on 3 things. The protocols.io group, the hypothesis group, and the dataset metadata.
Needs hypothesis and protocols.
Probably also includes load in some cases e.g. for the file level metadata that will be attached to package-id file-id pairs.
#+RESULTS[aaeaed353b6b51181c18cdb722696d821a27f63f]: graph-protocols
#+RESULTS[e419d8438b4609bab73327984f217d394a78f995]:
Since we are moving to run individual datasets the aggregate release process is decouple, and mediated via git:36a749b5c321cdb81ba81f9d35e050ceb8479976
After a release
#+RESULTS[52b162604dfc475d3f664c6d68b4b125978871ed]:
It should be possible to run all of these steps in a container
derived from a tgbugs/musl:kg-dev-user
docker image. See
https://github.com/tgbugs/dockerfiles/blob/master/source.org#kg-dev-user
https://hub.docker.com/repository/docker/tgbugs/musl/tags?name=kg-dev-user
All of these things either run manually or are independent of the SCKAN release process. In most cases manual intervention is needed to ensure that various component sources are up to date.
See ./setup.org::#export-v4 for the most recent workflow for batch release.
python -m sparcur.simple.fetch_annotations
python -m sparcur.cli export protcur
python -m neurondm.build release
python -m neurondm.models.apinat_pops_more
Super manual curation, identifier sync, review, merge, commit, push, etc.
The current workflow used to produce [file:~/git/NIF-Ontology/ttl/generated/neurons/apinat-partial-orders.ttl] is to run ./queries.org and then ./queries.org.
Below is the start of some code to extract the partial orders without having to load scigraph first.
#ori = OntResIri('https://cassava.ucsd.edu/ApiNATOMY/ontologies/.ttl')
import rdflib
from pathlib import Path
from pyontutils.core import OntResPath, OntConjunctiveGraph
from neurondm import orders as nord
b = Path('/tmp/build')
rp = sorted(b.glob('release-*-sckan'))[-1]
orp = OntResPath(rp / "data/sparc-data.ttl")
apinat_imports = [i for i in orp.imports if 'ApiNATOMY' in i.identifier]
# FIXME conjunctive graph has to conjoin by sharing a store ??? wat
apinat_graphs = [i.graph for i in apinat_imports]
ocg = OntConjunctiveGraph()
_ = [ocg.add((*t, g.boundIdentifier)) for g in apinat_graphs for t in g]
set(ocg.predicates())
apinat = rdflib.Namespace('https://apinatomy.org/uris/readable/')
elements = rdflib.Namespace('https://apinatomy.org/uris/elements/')
# TODO and the we have to recreate the neru-7 query
ancsl = list(ocg[:apinat.nextChainStartLevels:])
anext = list(ocg[:apinat.next:])
_soma = OntId('NLX:154731').u
somas = [s for s in ocg[:apinat.ontologyTerms:_soma] if (s, rdf.type, elements.Lyph) in ocg]
soma_links = set()
def soma_link_edges(soma):
# XXX uses internalIn instead of housingLyph
for link in ocg[soma:apinat.conveys:]:
soma_links.add(link)
for node_s in ocg[link:apinat.source:]:
for other_link in ocg[node_s:apinat.sourceOf:]: # FIXME only by convention that the direction matches
if other_link != link:
yield link, other_link
for node_t in ocg[link:apinat.target:]:
for other_link in ocg[node_t:apinat.sourceOf:]: # FIXME only by convention that the direction matches
if other_link != link:
yield link, other_link
soma_edges = [pair for s in somas for pair in soma_link_edges(s)]
def get_neuron(link):
# FIXME this requires somas
for lyph in ocg[link:apinat.conveyingLyph:]:
for group in ocg[lyph:apinat.seedIn:]:
for ot in ocg[group:apinat.ontologyTerms:]:
return ot
seeds = {l: neuron for l in soma_links if (neuron := get_neuron(l))} # FIXME right now we always seed on soma, but that is not required
link_lookup = {}
other_lookup = {}
def link_to_ont_region_layer(link):
# FIXME this doesn not quite match what we did in cypher
# FIXME most importantly it is missing the soma links
if link in soma_links:
hlii = apinat.internalIn
else:
hlii = apinat.housingLyph
cl = None
for cl in ocg[link:apinat.conveyingLyph:]:
hl = None
for hl in ocg[cl:hlii:]:
if hl in other_lookup:
link_lookup[link] = other_lookup[hl]
continue
i = None
for i, ot in enumerate(ocg[hl:apinat.ontologyTerms:]):
pass
if i is None:
for i, ot in enumerate(ocg[hl:apinat.inheritedOntologyTerms:]):
pass
if i is None:
for co in ocg[hl:apinat.cloneOf:]:
for i, ot in enumerate(ocg[co:apinat.ontologyTerms:]):
pass
if i is None:
for i, ot in enumerate(ocg[co:apinat.inheritedOntologyTerms:]):
pass
if i is None:
log.error(('rol', hl))
continue
elif i > 0:
log.warning(f'multiple 0 ontology terms {i + 1} for {hl}')
li = None
for li in ocg[hl:apinat.layerIn:]:
j = None
for j, liot in enumerate(ocg[li:apinat.ontologyTerms:]):
pass
if j is None:
for co in ocg[li:apinat.cloneOf:]:
for j, liot in enumerate(ocg[co:apinat.ontologyTerms:]):
pass
if j is None:
log.error(('r', li))
pass
elif j > 0:
log.warning(f'multiple 1 ontology terms {j + 1} {li}')
if li is None:
other_lookup[hl] = link_lookup[link] = nord.rl(region=ot)
else:
other_lookup[hl] = link_lookup[link] = nord.rl(region=liot, layer=ot) # til that -liot -> rdflib.NegatedPath
if hl is None:
log.error(f'conveying lyph for link has no housing lyph??? {cl} {link}')
# FIXME axon-chain-gastric-duodenum-neuron-4 has housingLyphs on the chain
# and housingLayers on the chain, but those are not being materialized to the
# generated lyphs, which they need to be, also wbkg terms are not being lifted
# so they are not appearing in the chain housingLyphs, also reordering of these
# on the chain during serialization to ttl is another reason why they need to be
# materialized down to the generated lyphs
link_lookup[link] = nord.rl(region=rdflib.URIRef(f'ERROR-{link}'))
if cl is None:
log.error(f'something has gone very wrong {link}')
link_lookup[link] = nord.rl(region=rdflib.URIRef(f'EXTREME-ERROR-{link}'))
neuron_parts = {
OntId('SAO:1770195789').u, # axon
OntId('SAO:1211023249').u, # dendrite or sensory axon
OntId('SAO:280355188').u, # regional part of axon
OntId('SAO:420754792').u, # regional part of dendrite
_soma,
}
def filter_for_neuron_parts(link):
for cl in ocg[link:apinat.conveyingLyph:]:
for ot in ocg[cl:apinat.ontologyTerms:]:
if ot in neuron_parts:
return ot
for iot in ocg[cl:apinat.inheritedOntologyTerms:]:
if iot in neuron_parts:
return iot
_l_adj = soma_edges + ancsl + anext
l_adj = [(a, b) for (a, b) in _l_adj if filter_for_neuron_parts(a) or filter_for_neuron_parts(b)]
l_skipped = set(_l_adj) - set(l_adj) # TODO review to make sure there aren't lurking issues
torep = set(e for es in l_adj for e in es)
_ = [link_to_ont_region_layer(link) for link in torep]
l_nst = nord.adj_to_nst(l_adj) # this is the ord over the links we may have to do this first and then replace because different neurons may converge, or do this first, get the distinct graphs per neuron, separate, replace and then h_adj -> h_nst
l_split_nst = l_nst[1:]
l_split_adj = [nord.nst_to_adj(n) for n in l_split_nst]
h_split_adj = [[(link_lookup[a], link_lookup[b]) for a, b in s] for s in l_split_adj]
h_split_nst = [nord.adj_to_nst(a) for a in h_split_adj]
# h_adj = [(link_lookup[a], link_lookup[b]) for a, b in l_adj] # can't do this only produces 52 distinct graphs
# h_nst = nord.adj_to_nst(h_adj)
nrns = [set(e for es in al for e in es) for al in l_split_adj]
def get_seed_neuron(ns):
key = ns & set(seeds)
assert len(key) <= 1, len(key)
if key:
return seeds[next(iter(key))]
nrn_index = [get_seed_neuron(ns) for ns in nrns]
results = list(zip(nrn_index, h_split_nst))
# TODO export to partial orders file for comparison
# a quick spot check suggests that there are issues
~/git/sparc-curation/docs/apinatomy.org --all
deploy to remote
apinat-build --deploy # XXX manual and not fully implemented
For new ApiNATOMY models also update ../resources/scigraph/sparc-data.ttl.
Use ~/git/prrequaestor/prcl.lisp with ~/ni/sparc/sync-specs.lisp.
prcl.lisp
is now published, still figuring out how to put all the
cross-cutting processes that I oversee in one place, sync-specs.lisp
will live wherever that is.
pushd ~/git/prrequaestor
./prcl-build.lisp
bin/prcl --specs ~/ni/sparc/sync-specs.lisp --fun nif-sct
bin/prcl --specs ~/ni/sparc/sync-specs.lisp --fun nif-scr
bin/prcl --specs ~/ni/sparc/sync-specs.lisp --fun nif-slim
For now manually pull the changes to the working repo.
set -e
### make sure all code is up-to-date
# scigraph/README.org
sh ~/git/pyontutils/nifstd/scigraph/README.org tangle
### release artifacts
# TODO prepare all ontology files first up here
# TODO add success steps for each of these probably?
# release.org to retrieve
~/git/sparc-curation/docs/release.org build --sckan --no-blaze --no-load
# XXX add --no-annos if ~/.ssh/config doesn't have cassava-sparc
# we have a circular dependency issue here, which is that
# we want to use the apinatomy models from blazegraph for scigraph
# but we also want to use the configured NIF-Ontology repo from
# scigraph for blazegraph, too many layers ...
~/git/pyontutils/nifstd/scigraph/bin/run-load-graph-sparc-sckan
# if you are iterating hard on apinat model development, this is the point
# at which you usually want to deploy to dev scigraph and test
# release.org
~/git/sparc-curation/docs/release.org build --sckan --no-blaze --resume
# XXX add --no-annos if ~/.ssh/config doesn't have cassava-sparc
### docker image build
# source.org build docker images
~/git/dockerfiles/source.org build-image run:sckan-build-save-image
### testing phase 1, probably move to docker
# XXX manual
echo manual step waiting for signal from human to start next automated portion
This has now been integrated into ref:build-sckan-release by using ../../dockerfiles/source.org to run the block(s) directly.
~/git/dockerfiles/source.org build-image run:sckan-build-save-image
run the docker build blocks in ../../dockerfiles/source.org for the sckan release that do all the copying of the build state to the requisite locations order is bottom to top (maybe use ob-lob) ../../dockerfiles/source.org::&musl-build-sckan-base ../../dockerfiles/source.org::&musl-build-sckan-services
dump the docker image for the zenodo release ../../dockerfiles/source.org::&sckan-save-image
Deploy to dev scigraph.
~/ni/dev/bin/run-deploy-graph-selene
Deploy to local dev blazegraph. ./release.org::*Deploy journal and prefixes to local server
critical checks:
- run the ApiNATOMY dashboard queries and inspect the numbers
- check files in data folder for empty classes e.g. via
grep 'a owl:Class \.$'
(can do this before scigraph load) - check prov-record matches expected
- future run the NPO dashboard
# archive all publication artifacts at one of the usual locations e.g. ~/nas/data
# FIXME hardcoded paths
# FIXME need to set -e on ALL of these blocks I think?
_archive=~/"nas/data/"
_sckanrz="$(ls -d /tmp/build/release-*-sckan.zip | sort -u | tail -n 1)"
_sckansz=/tmp/scigraph-build/sparc-sckan/$(readlink /tmp/scigraph-build/sparc-sckan/LATEST)
_sckandz="$(ls -d /tmp/docker-sckan-data-*Z.tar.gz | sort -u | tail -n 1)"
declare -a _arts=(${_sckanrz} ${_sckansz} ${_sckandz})
for _art in "${_arts[@]}"; do
echo $(basename ${_art})
rsync -a ${_art} ${_archive}
done
### deploy to sparc-scigraph
# XXX NOTE we NEVER deploy to sckan-scigraph directly always via
# sparc-scigraph and then once that passes we do the release
~/git/pyontutils/nifstd/scigraph/bin/run-deploy-graph-sparc-sckan
### update services as needed
# ~/git/pyontutils/nifstd/scigraph/bin/run-deploy-services-sparc
Use mapknowledge to test over all neurons where all neurons is the list from some other query
(defun advise--obe-python-path (command &rest args)
(let* ((params (cadr args))
(_ (message "%s" params))
(python-path (cdr (assq :python-path params)))
(process-environment
(or (and python-path
(cons (format "PYTHONPATH=%s" python-path)
process-environment))
process-environment)))
(apply command args)))
(advice-add #'org-babel-execute:python :around
#'advise--obe-python-path)
#import sys
#return [[p] for p in sys.path]
import random
from mapknowledge import KnowledgeStore
from mapknowledge.scicrunch import SCICRUNCH_PRODUCTION, SCICRUNCH_STAGING, SciCrunch
from pyontutils.scigraph_codegen import moduleDirect
from pyontutils.config import auth
from sparcur.utils import log
def test_prod(curie):
return test_endpoint(curie, SCICRUNCH_PRODUCTION)
def test_stage(curie):
return test_endpoint(curie, SCICRUNCH_STAGING)
def test_endpoint(curie, endpoint):
store = KnowledgeStore(scicrunch_release=endpoint)
store._KnowledgeStore__scicrunch._SciCrunch__scicrunch_key = auth.get('scigraph-api-key') # XXX SSIIIIIGH
try:
wat, *rest = curie.rsplit('/', 1)
result = store.entity_knowledge(wat)
log.info(result)
finally:
store.close()
#
skip = ('id', 'label', 'long-label', 'phenotypes', 'references')
out = {k:v for k, v in result.items() if v and k not in skip}
return out # !?!??!?!!?! apparently a newline before this breaks everything !??!
def testn(curie):
p = test_prod(curie)
s = test_stage(curie)
# XXX probably not quite right given ps and s values
if p and not s:
return False, ('removed', p, s)
elif not s:
return False, ('missing', p, s)
if s and not p:
log.info(f'new neuron {curie}')
#
return True, ('ok', p, s)
def testnall(curies):
bads = []
for curie in curies:
try:
ok, data = testn(curie)
except Exception as e:
ok, data = False, ('error', e, e)
if not ok:
bads.append((curie, data))
if bads:
log.error(bads)
#
return bads
def dewit(endpoint):
base = f'https://scicrunch.org/api/1/{endpoint}' # XXX no trailing path
scigraphd = moduleDirect(base, 'scigraphd')
scigraphd.restService._api_key = auth.get('scigraph-api-key')
sgc = scigraphd.Cypher()
#sgd = scigraphd.Dynamic() # XXX FIXME all pops no in the dynamic endpoints atm
curies_raw = sgc.execute(
# XXX must use `org-babel-lob-ingest' on queries.org for this to work
"""
OPTIONAL MATCH (start:Ontology)
<-[:isDefinedBy]-(graph:NamedIndividual)
-[:type]->({iri: "https://apinatomy.org/uris/elements/Graph"})
, (start)
<-[:isDefinedBy]-(external:Class)
-[:subClassOf*]->(:Class {iri: "http://uri.interlex.org/tgbugs/uris/readable/NeuronEBM"})
return external
""", limit=99999, output='application/json')
#curies_raw = sgd
curies = [c['id'] for c in curies_raw['nodes']]
#breakpoint()
count = 2
sample_size = 15
bads = []
for n in range(count):
# randomly sample the subset to test
test_set = random.sample(curies, sample_size)
_bads = testnall(test_set)
bads.extend(_bads)
return bads
result = dewit(SCICRUNCH_STAGING)
Future: similar for NPO
Create a new pre-release at https://github.com/SciCrunch/NIF-Ontology/releases
Tag ymd should match the results of the release version query from the embedded provenance record.
tag: sckan-%Y-%m-$d
target: dev
title: sckan-%Y-%m-$d
Upload the blazegraph, scigraph, and docker archives.
Add link to changelog entry.
# push docker images
docker push tgbugs/sckan:$(docker image ls tgbugs/sckan:base-*Z | sort | tail -n 1 | awk '{ print $2 }')
docker push tgbugs/sckan:$(docker image ls tgbugs/sckan:data-*Z | sort | tail -n 1 | awk '{ print $2 }')
docker push tgbugs/sckan:base-latest
docker push tgbugs/sckan:latest
Most of the testing at this stage is of the functionality in tgbugs/musl:kg-release-user so it is ok to push the data images first.
### testing
echo manual step waiting for signal from human to start next automated portion
TODO automate. Create the new docker container volume for sckan data run it with the latest version of tgbugs/musl:kg-release-user run all the examples etc. Use the shebang block in queries.org or similar to execute all the blocks and check to make sure they are working out as expected. Better yet if we can write some internal consistency checks i.e. between NPO and ApiNATOMY.
### publication
echo manual step waiting for signal from human to start next automated portion
### changelog
# XXX manual
One way to generate a changelog, not the best, but possible.
pushd ~/git/apinatomy-models
git diff $(the last commit before the previous release)..HEAD -- models
pushd ~/git/NIF-Ontology
git diff $(the last commit before the previous release)..HEAD -- *.ttl ttl
# prepare the new zenodo release
# deploy scigraph image to production
Once all tests have passed and we receive the ok from MAP we promote sparc-scigraph to sckan-scigraph.
- back up existing prod services.yaml
- copy data from aws-scigraph-scigraph to aws-scigraph-sckan-scigraph
- alternately curl from github release or rsync from internal stores, the issue is the release id is not predictable right now
- copy services.yaml (and the raw) from aws-scigraph-scigraph to aws-scigraph-sckan-scigraph
- unzip data
- stop service
- unlink/ln -s
- start service
echo "not running to avoid accidental deployment"
echo "if you want to do this comment out these lines and the exit line"
exit 1
source "$(eval echo ~/git/pyontutils/nifstd/scigraph/bin/scigraph-functions.sh)"
host_stage=aws-scigraph-scigraph
host_prod=aws-scigraph-sckan-scigraph
host_prod_sudo=aws-scigraph-sckan
path_stamped="$(ssh ${host_stage} "realpath graph")"
path_zip="${path_stamped}.zip"
path_yaml="$(ssh ${host_stage} "realpath services.yaml")"
path_yaml_raw="$(ssh ${host_stage} 'realpath $(head -n 1 services.yaml | cut -d" " -f 2)')"
__TEST='echo ${USER}@${HOSTNAME} '
__TEST=''
# prod backup existing services
ssh ${host_prod} "${__TEST}"'cp services.yaml services.yaml-'"$(date +%s)" || exit $?
# stage>prod data and services
# XXX no rsync because the service users should not have ssh access to anything
# FIXME the egress on this is stupid, ideally run this whole script from within aws
paths=("${path_zip}" "${path_yaml}" "${path_yaml_raw}")
for path in "${paths[@]}"; do
ssh ${host_stage} "${__TEST}"'sha256sum '"${path}";
# XXX FIXME BEWARE cannot test the pipe and redirection easily
ssh ${host_stage} "${__TEST}"'cat '"${path}" |\
ssh ${host_prod} 'cat - > '"${path}";
ssh ${host_prod} "${__TEST}"'sha256sum '"${path}";
done
# prod unzip
ssh ${host_prod} "${__TEST}"'unzip -n '"${path_zip}" || exit $?
# prod stop relink start
ssh -t ${host_prod_sudo} "$(typeset -f service-manager);"\
"${__TEST}"'service-manager scigraph stop &&'\
"${__TEST}"'sudo unlink /var/lib/scigraph/graph;'\
"${__TEST}"'sudo ln -s '"${path_stamped}"' /var/lib/scigraph/graph;'\
"${__TEST}"'service-manager scigraph start'
Easier to read, harder to debug. The python paradox.
Cache annotations. See (bash-pipeline-fetch-annotations) for usage.
from pathlib import Path
from hyputils import hypothesis as hyp
from sparcur.config import auth
def from_group_name_fetch_annotations(group_name):
""" pull hypothesis annotations from remote to local """
group_id = auth.user_config.secrets('hypothesis', 'group', group_name)
cache_file = Path(hyp.group_to_memfile(group_id + 'sparcur'))
get_annos = hyp.Memoizer(cache_file, group=group_id)
get_annos.api_token = auth.get('hypothesis-api-key') # FIXME ?
annos = get_annos()
return cache_file # needed for next phase, annos are not
def from_group_name_cached_annos(group_name):
group_id = auth.user_config.secrets('hypothesis', 'group', group_name)
cache_file = Path(hyp.group_to_memfile(group_id + 'sparcur'))
get_annos = hyp.AnnoReader(cache_file, group=group_id)
annos = get_annos()
return annos
def from_user_name_group_name_h(user_name, group_name):
group_id = auth.user_config.secrets('hypothesis', 'group', group_name)
h = hyp.HypothesisUtils(username=user_name, group=group_id)
h.token = auth.user_config.secrets('hypothesis', 'api', user_name)
return h
def main(hypothesis_group_name=None, **kwargs):
if hypothesis_group_name is None:
hypothesis_group_name = 'sparc-curation'
from_group_name_fetch_annotations(hypothesis_group_name)
if __name__ == '__main__':
from sparcur.simple.utils import pipe_main
pipe_main(main)
Temporary location for some helper code to clone protcur annos to a new group.
from sparcur.simple.fetch_annotations import (
from_group_name_fetch_annotations,
from_group_name_cached_annos,
from_user_name_group_name_h)
from protcur import document as ptcdoc
#def main():
annos = from_group_name_cached_annos('sparc-curation')
bannos = [ptcdoc.Annotation(a) for a in annos]
pool = ptcdoc.Pool(bannos)
anno_counts = ptcdoc.AnnoCounts(pool)
if False: # sort debug
ta = pool.topological_annos
def asdf(ta):
hrm = [a.id for a in ta]
de = [(i, [(hrm.index(pid) if pid in hrm else 'oops') for pid in a.text_parent_ids]) for i, a in enumerate(ta) if a.text_parent_ids]
qq = [(i, l) for i, l in de if [_ for _ in l if _ != 'oops' and i < _]]
sigh = sorted(set([x for i, l in qq for x in [i] + l if x != 'oops']))
return qq, sigh
qq, sigh = asdf(ta)
bads = [ta[f] for f in sigh]
sbads = ptcdoc.toposort(bads)
qq2, sigh2 = asdf(sbads)
group_name = 'fixed-sparc-curation'
cache_file = from_group_name_fetch_annotations(group_name)
html_annos = from_group_name_cached_annos(group_name)
html_bannos = [ptcdoc.Annotation(a) for a in html_annos]
#html_bannos = []
html_pool = ptcdoc.Pool(html_bannos)
h_p_f = from_user_name_group_name_h('protbot', group_name)
#pool.clone_to(html_pool, h_p_f)
test = False
if test:
test_bannos = list(pool.bySlugTail('wzuff6w')) # published with most annos
#test_bannos = list(pool.bySlugTail('ba8hiht6')) # published with low number of annos
test_pool = ptcdoc.Pool(test_bannos)
test_pool.clone_to(html_pool, h_p_f, test=False)
else:
pool.clone_to(html_pool, h_p_f, test=test)
[a._row for a in html_pool._annos]
# [h_p_f.delete_annotation(a.id) for a in html_pool._annos]
if __name__ == '__main__':
main()
This is an example of how to clone the top level of a project.
See ref:utils.py for a good way to instantiate RemotePath
.
from pathlib import Path
# clone top level
def from_path_id_and_backend_project_top_level(parent_path,
project_id,
RemotePath,
symlink_objects_to=None,):
""" given the enclosing path to clone to, the project_id, and a fully
configured (with Local and Cache) backend remote path, anchor the
project pointed to by project_id along with the first level of children """
project_path = _from_path_id_remote_project(parent_path,
project_id,
RemotePath,
symlink_objects_to)
return _from_project_path_top_level(project_path)
def from_path_project_backend_id_dataset(parent_path,
project_id,
dataset_id,
RemotePath,
symlink_objects_to=None,):
project_path = _from_path_id_remote_project(parent_path,
project_id,
RemotePath,
symlink_objects_to)
return _from_project_path_id_dataset(project_path, dataset_id)
def _from_path_id_remote_project(parent_path, project_id, RemotePath, symlink_objects_to):
RemotePath.init(project_id) # calling init is required to bind RemotePath._api
anchor = RemotePath.smartAnchor(parent_path)
anchor.local_data_dir_init(symlink_objects_to=symlink_objects_to)
project_path = anchor.local
return project_path
def _from_project_path_top_level(project_path):
""" given a project path with existing cached metadata
pull the top level children
WARNING: be VERY careful about using this because it
does not gurantee that rmeta is available to mark
sparse datasets. It may be the case that the process
will fail if the rmeta is missing, or it may not. Until
we are clear on the behavior this warning will stay
in place. """
# this is a separate function in case the previous step fails
# which is also why it is hidden, it makes too many assuptions
# to be used by itself
anchor = project_path.cache
list(anchor.children) # this fetchs data from the remote path to the local path
return project_path # returned instead of anchor & children because it is needed by next phase
def _from_project_path_id_dataset(project_path, dataset_id):
anchor = project_path.cache
remote = anchor._remote_class(dataset_id)
cache = anchor / remote
return cache.local
def main(parent_path=None,
project_id=None,
parent_parent_path=Path.cwd(),
project_id_auth_var='remote-organization', # FIXME move default to clifun
symlink_objects_to=None,
id=None,
dataset_id=None,
**kwargs):
""" clone a project into a random subfolder of the current folder
or specify the parent path to clone into """
from sparcur.simple.utils import backend_pennsieve
if parent_path is None:
breakpoint() # XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX FIXME
import tempfile
parent_path = Path(tempfile.mkdtemp(dir=parent_parent_path))
if project_id is None:
from sparcur.config import auth
from sparcur.utils import PennsieveId
project_id = auth.get(project_id_auth_var)
project_id = PennsieveId(project_id) # FIXME abstract the hardcoded backend
RemotePath = backend_pennsieve()
if id and dataset_id:
# FIXME doesn't check for existing so if the name changes we get duped folders
# this issue possibly upstream in retrieve, clone just clones whatever you tell
# it to clone, but maybe it should check the existing metadata and fail or warn?
dataset_path = from_path_project_backend_id_dataset(
parent_path,
project_id,
id, # FIXME multiple datasets
RemotePath,
symlink_objects_to,)
return dataset_path
project_path = from_path_id_and_backend_project_top_level(
parent_path,
project_id,
RemotePath,
symlink_objects_to,)
return project_path
if __name__ == '__main__':
from sparcur.simple.utils import pipe_main
pipe_main(main, after=print)
Remote metadata must be retrieved prior to the first pull in order to ensure that large datasets can be marked as sparse datasets before they are pulled.
Remote metadata can be retrieved using only a project_id. However, for all retrieval after the first pull it is usually more effective to retrieve it at the same time as fetching metadata files since it runs in parallel per dataset.
See (bash-pipeline-fetch-remote-metadata-all) for usage.
from joblib import Parallel, delayed
from sparcur.backends import PennsieveDatasetData
from sparcur.simple.utils import backend_pennsieve
def from_id_fetch_remote_metadata(id, project_id=None, n_jobs=12):
""" given an dataset id fetch its associated dataset metadata """
if id.type == 'organization':
RemotePath = backend_pennsieve()
project = RemotePath(id)
prepared = [PennsieveDatasetData(r) for r in project.children]
if n_jobs <= 1:
[p() for p in prepared]
else:
# FIXME Paralle isn't really parallel here ...
# can't use multiprocessing due to broken aug.RemotePath implementation
# LOL PYTHON everything is an object, except when you want to pickle it
# then some objects are more equal than others
Parallel(n_jobs=n_jobs)(delayed(p._no_return)() for p in prepared)
elif id.type == 'dataset':
RemotePath = backend_pennsieve(project_id)
dataset = RemotePath(id)
bdd = PennsieveDatasetData(dataset)
bdd()
else:
raise NotImplementedError(id)
def main(id=None,
project_id=None,
project_id_auth_var='remote-organization', # FIXME move to clifun
all_projects=False,
n_jobs=12,
**kwargs):
if all_projects:
from sparcur.utils import PennsieveId as RemoteId
from sparcur.config import auth
project_ids = [
RemoteId(p) for p in auth.get_list('remote-organizations')]
for project_id in project_ids:
main(id=project_id, project_id=project_id, all_projects=False,
n_jobs=n_jobs, **kwargs)
return
if project_id is None:
if id is not None:
msg = 'id must be passed with project id'
raise TypeError(msg)
from sparcur.utils import PennsieveId as RemoteId
from sparcur.config import auth
project_id = auth.get(project_id_auth_var)
project_id = RemoteId(project_id) # FIXME abstract the hardcoded backend
if id is None:
id = project_id
from_id_fetch_remote_metadata(id,
project_id=project_id,
n_jobs=n_jobs,)
if __name__ == '__main__':
from sparcur.simple.utils import pipe_main
pipe_main(main) # nothing to print or do after
sparcur.backends.PennsieveDatasetData
supports the ability
to retrieve metadata directly from the remote without the need for an intervening
local path. However this functionality is obscured here because we want to derive
a consistent view of the data from the file system snapshot.
from joblib import Parallel, delayed
from sparcur.paths import Path
from sparcur.backends import PennsieveDatasetData
def _from_project_path_fetch_remote_metadata(project_path, n_jobs=12, cached_ok=False):
if n_jobs <= 1:
prepared = [PennsieveDatasetData(dataset_path.cache)
for dataset_path in project_path.children]
[bdd() for bdd in prepared if not (cached_ok and bdd.cache_path.exists())]
else:
fetch = lambda bdd: bdd() if not (cached_ok and bdd.cache_path.exists()) else None
fetch_path = (lambda path: fetch(PennsieveDatasetData(path.cache)))
Parallel(n_jobs=n_jobs)(delayed(fetch_path)(dataset_path)
for dataset_path in project_path.children)
# fetch remote metadata
def from_path_fetch_remote_metadata(path, n_jobs=12, cached_ok=False):
""" Given a path fetch remote metadata associated with that path. """
cache = path.cache
if cache.is_organization():
_from_project_path_fetch_remote_metadata(path, n_jobs=n_jobs, cached_ok=cached_ok)
else: # dataset_path
# TODO more granular rather than roll up to dataset if inside?
bdd = PennsieveDatasetData(cache)
if not (cached_ok and bdd.cache_path.exists()):
bdd()
def main(path=Path.cwd(), n_jobs=12, rmeta_cached_ok=False, **kwargs):
if path is None or path.find_cache_root() not in (path, *path.parents):
from sparcur.simple.clone import main as clone
path = clone(path=path, n_jobs=n_jobs, **kwargs)
# NOTE path is passed along here, but kwargs is expected to contain
# parent_path or parent_parent_path and project_id note that if that
# happens then the path returned from clone will change accordingly
from_path_fetch_remote_metadata(path, n_jobs=n_jobs, cached_ok=rmeta_cached_ok)
return path
if __name__ == '__main__':
from sparcur.simple.utils import pipe_main
pipe_main(main) # we probably don't print here?
Pull a single dataset or pull all datasets or clone and pull all datasets.
from joblib import Parallel, delayed
from sparcur.paths import Path
from sparcur.utils import GetTimeNow
# pull dataset
def from_path_dataset_file_structure(path, time_now=None, exclude_uploaded=False):
""" pull the file structure and file system metadata for a single dataset
right now only works from a dataset path """
if time_now is None:
time_now = GetTimeNow()
path._pull_dataset(time_now, exclude_uploaded)
# pull all in parallel
def from_path_dataset_file_structure_all(project_path,
*args,
paths=None,
time_now=None,
n_jobs=12,
exclude_uploaded=False):
""" pull all of the file structure and file system metadata for a project
paths is a keyword argument that accepts a list/tuple of the subset of
paths that should be pulled """
if time_now is None:
time_now = GetTimeNow()
project_path.pull(
paths=paths,
time_now=time_now, # TODO
debug=False, # TODO
n_jobs=n_jobs,
log_level='DEBUG' if False else 'INFO', # TODO
Parallel=Parallel,
delayed=delayed,
exclude_uploaded=exclude_uploaded,)
# mark datasets as sparse
def sparse_materialize(path, sparse_limit:int=None):
""" given a path mark it as sparse if it is a dataset and
beyond the sparse limit """
cache = path.cache
if cache.is_organization():
# don't iterate over cache children because that pulls remote data
for child in path.children:
sparse_materialize(child, sparse_limit=sparse_limit)
else:
cache._sparse_materialize(sparse_limit=sparse_limit)
def main(path=Path.cwd(),
time_now=None,
sparse_limit:int=None,
n_jobs=12,
exclude_uploaded=False,
no_index=False,
**kwargs):
if path != path.resolve():
raise ValueError(f'Path not resolved! {path}')
project_path = None # path could be None so can't find_cache_root here
if path is None or path.find_cache_root() not in (path, *path.parents):
from sparcur.simple.fetch_remote_metadata import main as remote_metadata
project_path = remote_metadata(path=path, **kwargs) # transitively calls clone
else:
project_path = path.find_cache_root()
if path != project_path:
# dataset_path case
sparse_materialize(path, sparse_limit=sparse_limit)
from_path_dataset_file_structure(path, time_now=time_now, exclude_uploaded=exclude_uploaded)
if path == Path.cwd():
print('NOTE: you probably need to run `pushd ~/ && popd` '
'to get a sane view of the filesystem if you ran this'
'from within a dataset folder')
return path
if not list(project_path.children):
raise FileNotFoundError(f'{project_path} has no children.')
# somehow clone failed
# WARNING if rmeta failed you may get weirdness # FIXME
from sparcur.simple.clone import _from_project_path_top_level
_from_project_path_top_level(project_path)
if no_index:
Path._noindex = True
sparse_materialize(project_path,
sparse_limit=sparse_limit)
from_path_dataset_file_structure_all(project_path,
time_now=time_now,
n_jobs=n_jobs,
exclude_uploaded=exclude_uploaded)
return project_path
if __name__ == '__main__':
from sparcur.simple.utils import pipe_main
pipe_main(main, after=print)
The non-simple implementation of this is quite convoluted so here are links to the current implementation, from outside in. In reverse order the basic steps are pull from dataset packages endpoint, resolve hierarchy, convert to remote paths, and covert to cache paths which materialize the pull as folders or symlinks.
- entry point to pull
- the implementation of rchildren for Pennsieve
- looping over packages to covert them to paths
- transform dataset packages endpoint json into Pennsieve api objects
def main(path=None, **kwargs):
if path is not None:
# FIXME this will fail if the remote for the file is not in
# the current project, or if the cachedir is not a child of
# the top level project directory e.g. in .operations/objects
cache = path.cache
cache.fetch(size_limit_mb=None)
if __name__ == '__main__':
from sparcur.simple.utils import pipe_main
pipe_main(main)
from itertools import chain
from sparcur import exceptions as exc
from sparcur.utils import log, logd
from sparcur.paths import Path
from sparcur.datasets import DatasetStructure
from sparcur.simple.utils import fetch_paths_parallel, rglob
# fetch metadata files
fetch_prefixes = (
('dataset_description', 'glob'),
('subjects', 'glob'),
('samples', 'glob'),
('sites', 'glob'),
('performances', 'glob'),
('submission', 'glob'),
('manifest', 'rglob'), # XXX NOTE the rglob here
)
def _from_path_fetch_metadata_files_simple(path, fetch=True):
""" transitive yield paths to all metadata files, fetch them from
the remote if fetch == True """
for glob_prefix, glob_type in fetch_prefixes:
if glob_type == 'rglob':
gp0 = glob_prefix[0]
pattern = f'[{gp0.upper()}{gp0}]{glob_prefix[1:]}*'
yield from rglob(path, pattern)
continue
ds = DatasetStructure(path)
for path_to_metadata in ds._abstracted_paths(glob_prefix,
glob_type=glob_type,
fetch=fetch): # FIXME fetch here is broken
yield path_to_metadata
def _from_path_fetch_metadata_files_parallel(path, n_jobs=12):
""" Fetch all metadata files within the current path in parallel. """
paths_to_fetch = _from_path_fetch_metadata_files_simple(path, fetch=False)
try:
first = next(paths_to_fetch)
paths_to_fetch = chain((first,), paths_to_fetch)
except StopIteration:
log.warning('No paths to fetch, did you pull the file system metadata?')
return
# FIXME passing in a generator here fundamentally limits the whole fetching
# process to a single thread because the generator is stuck feeding from a
# single process, IF you materialize the paths first then the parallel fetch
# can actually run in parallel, but bugs/errors encountered late in collecting
# the paths will force all previous work to be redone
# XXX as a result of this we use the posix find command to implement rglob
# in a way that is orders of magnitude faster
paths_to_fetch = list(paths_to_fetch)
fetch_paths_parallel(paths_to_fetch, n_jobs=n_jobs)
def from_path_fetch_metadata_files(path, n_jobs=12):
""" fetch metadata files located within a path """
#if n_jobs <= 1:
#gen = _from_path_fetch_metadata_files_simple(path)
# FIXME broken ??? somehow abstracted paths doesn't fetch when
# we run in directly, or somehow fetch_paths_parallel does something
# different
#paths = list(gen)
#else:
_from_path_fetch_metadata_files_parallel(path, n_jobs=n_jobs)
def main(path=Path.cwd(), n_jobs=12, **kwargs):
if path is None or path.find_cache_root() not in (path, *path.parents):
from sparcur.simple.pull import main as pull
path = pull(path=path, n_jobs=n_jobs, **kwargs)
from_path_fetch_metadata_files(path, n_jobs=n_jobs)
return path
if __name__ == '__main__':
from sparcur.simple.utils import pipe_main
pipe_main(main, after=print)
Fetch files by extension.
import os
from functools import wraps
from sparcur.paths import Path
from sparcur.utils import _find_command, log
from sparcur.simple.utils import fetch_paths_parallel
def _datasets_with_extension(path, extension):
""" Hack around the absurd slowness of python's rglob """
# TODO query multiple extensions with -o at the same time
command = fr"""for d in */; do
{_find_command} "$d" \( -type l -o -type f \) -name '*.{extension}' \
-exec getfattr -n user.bf.id --only-values "$d" \; -printf '\n' -quit ;
done"""
with path:
with os.popen(command) as p:
string = p.read()
has_extension = string.split('\n')
datasets = [p for p in path.children if p.cache_id in has_extension]
return datasets
def _from_path_fetch_files_simple(path, filter_function, fetch=True, _ce=False):
files = filter_function(path)
errors = []
if _ce:
def _collect_errors(f, file):
@wraps(f)
def inner(*args, **kwargs):
try:
return f(*args, **kwargs)
except Exception as e:
errors.append((file, e))
return inner
else:
def _collect_errors(f, file):
return f
if fetch:
# FIXME keep an eye on the gc here
[_collect_errors(f.cache.fetch, f)(size_limit_mb=None) for f in files
if not f.exists() or f.size == 0 or f.size != f.cache.meta.size]
if errors:
msg = f'{len(errors)} errors:\n{errors}'
log.error(msg)
#Async(rate=5)(deferred(f.fetch)(size_limit_mb=None)
#for f in files if not f.exists())
return files
def _from_path_fetch_files_parallel(path, filter_function, n_jobs=12):
paths_to_fetch = _from_path_fetch_files_simple(path, filter_function, fetch=False)
fetch_paths_parallel(paths_to_fetch, n_jobs=n_jobs)
def filter_extensions(extensions):
""" return a function that selects files in a path by extension """
def filter_function(path):
cache = path.cache
if cache.is_organization():
paths = set()
for ext in extensions:
ds = _datasets_with_extension(path, ext)
paths.update(ds)
else: # dataset_path
paths = path,
# FIXME exclude folders, some people put . in a folder name
files = [matching # FIXME stream ?
for path in paths
for ext in extensions
for matching in path.rglob(f'*.{ext}')
if not matching.is_dir()]
return files
return filter_function
def filter_manifests(dataset_blob):
""" return a function that selects certain files listed in manifest records """
# FIXME this needs a way to handle organization level?
# NOTE this filter is used during the second fetch phase after the inital
# metadata has been ingested to the point where it can be use to guide further fetches
# TODO this is going to require the implementation of partial fetching I think
# TODO preprocessing here?
def filter_function(path):
# TODO check that the path and the dataset blob match
cache = path.cache
if cache.id != dataset_blob['id']:
msg = f'Blob is not for this path! {dataset_blob["id"]} != {cache.id}'
raise ValueError(msg)
files = [] # TODO get_files_for_secondary_fetch(dataset_blob)
return files
return filter_function
def from_path_fetch_files(path, filter_function, n_jobs=12):
if n_jobs <= 1:
_from_path_fetch_files_simple(path, filter_function)
else:
_from_path_fetch_files_parallel(path, filter_function, n_jobs=n_jobs)
def main(path=Path.cwd(), n_jobs=12, extensions=('xml',), **kwargs):
#breakpoint() # XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX
if path is None or path.find_cache_root() not in (path, *path.parents):
from sparcur.simple.pull import main as pull
path = pull(path=path, n_jobs=n_jobs, **kwargs)
filter_function = filter_extensions(extensions)
# remote init outside async to avoid race condition in boto3 on windows
# from sparcur.paths.BFPNCacheBase.data
if not hasattr(path.cache._remote_class, '_api'):
path.cache._remote_class.anchorToCache(path.cache.anchor)
from_path_fetch_files(path, filter_function, n_jobs=n_jobs)
return path
if __name__ == '__main__':
from sparcur.simple.utils import pipe_main
pipe_main(main)
Once the initial pass over the dataset is complete extract the list of additional files that need to be retrieved and fetch them.
from sparcur.paths import Path
from sparcur.simple.fetch_files import from_path_fetch_files, filter_manifests
def from_blob_fetch_files(dataset_blob, path=None):
# should the blob contain a reference to the path
# it was derived from?
filter_function = filter_manifests(dataset_blob)
from_path_fetch_files(path, filter_function, n_jobs=n_jobs)
def main(path=Path.cwd(), n_jobs=12, **kwargs):
#breakpoint() # XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX
# if not dataset_blob: get_blob vs path blob pairs?
# starting from a partial blob means that this probably
# should not kick off from the file system, but we know
# that we will want to be able to kick it off from the
# file system ... maybe the intermediate blobs can encode
# the prov of where the file system reference they were
# derived from lives ?
dataset_blob = get_blob(path.cache_id) # FIXME TODO
from_blob_fetch_files(dataset_blob, path)
if __name__ == '__main__':
from sparcur.simple.utils import pipe_main
pipe_main(main)
from_id_remote_metadata = lambda id: ds.PennsieveDatasetData(id)()
compose = lambda f, g: (lambda *x: f(g(*x)))
#from_path_remote_metadata = compose(lambda id: from_id_remote_metadata(id),
#lambda path: path.cache.id)
Putting it all together into a single command.
The behavior of retrieve works exactly as it does for clone the difference is that it runs for just a single dataset and the parent_path is made to be the dataset_id uuid if you are running a single dataset pipeline you will still need the project folder structure for logs and jobs etc. you can also still run all datasets together off of a single SPARC Consoritum folder, in which case all you need to do is pass the communal parent_path
Usage example.
python -m sparcur.simple.retrieve \
--dataset-id N:dataset:21957eae-0824-4fb5-b18f-04d6ed12ce18 \
--symlink-objects-to /mnt/str/tom/cache/bf-object-cache
Example python usage.
from sparcur.paths import Path
from sparcur.utils import PennsieveId
from sparcur.simple.retrieve import main as retrieve
p = PennsieveId('N:organization:618e8dd9-f8d2-4dc4-9abb-c6aaab2e78a0')
d = PennsieveId('N:dataset:21957eae-0824-4fb5-b18f-04d6ed12ce18')
ppp = Path('~/files/sparc-datasets').expanduser().resolve()
retrieve(id=d, dataset_id=d, project_id=p, parent_parent_path=ppp)
sparcur.simple.retrieve
implementation
from sparcur.paths import Path
from sparcur.utils import symlink_latest
from sparcur.simple.clone import main as clone
from sparcur.simple.fetch_remote_metadata_all import main as remote_metadata
from sparcur.simple.pull import main as pull
from sparcur.simple.fetch_metadata_files import main as fetch_metadata_files
from sparcur.simple.fetch_files import main as fetch_files
def main(id=None,
dataset_id=tuple(),
parent_path=None,
parent_parent_path=Path.cwd(),
path=None, # keep path out of kwargs
invariant_local_path='dataset',
#extensions=('xml',), # not needed right now
**kwargs):
# FIXME parent_path and project_id seem like they probably need to
# be passed here, it would be nice if project_path could just be
# the current folder and if the xattrs are missing for the
# project_id then ... it is somehow inject from somewhere else?
# this doesn't really work, because that would mean that we would
# have to carry the project id around in the xattr metadata for
# all dataset folders, which might not be the worst thing, but
# definitely redundant
if id is None:
raise TypeError('id is a required argument!')
if parent_path is None:
uuid = id.uuid # FIXME hardcoded backend assumption
parent_path = parent_parent_path / uuid
parent_path.mkdir(exist_ok=True)
elif not parent_path.exists():
parent_path.mkdir()
invariant_path = parent_path / invariant_local_path
# XXX for now we do these steps in order here
# rather than trusting that calling simple.pull.main will do
# the right thing if there is no path ... it should but probably
# doesn't right now due to assumptions about paths existing
# remote metadata from path (could do from id too?)
remote_metadata(id=id, **kwargs) # could run parallel to clone, easier in bash
# clone single without organization parent somehow seems likely broken?
path = clone(id=id,
dataset_id=dataset_id,
parent_path=parent_path,
parent_parent_path=parent_parent_path,
**kwargs) # XXX symlink_objects_to will just work if you pass it
symlink_latest(path, invariant_path)
# pull single
pull(path=path, **kwargs)
# fetch metadata files
fetch_metadata_files(path=path, **kwargs) # FIXME symlink_to
# XXXXXXXXXXXXXXXXXXXXXXXXXXXXXX fetch_metadata_files does NOT USE the extensions kwarg!
# fetch additional files
fetch_files(path=path, **kwargs)
return path
if __name__ == '__main__':
from sparcur.simple.utils import pipe_main
pipe_main(main, after=print)
Usage example.
python -m sparcur.simple.extract \
--dataset-id N:dataset:21957eae-0824-4fb5-b18f-04d6ed12ce18 \
--export-parent-path 21957eae-0824-4fb5-b18f-04d6ed12ce18/exports
from sparcur import datasets as dat
from sparcur import pipelines as pipes
from sparcur import exceptions as exc
from sparcur.utils import log, logd
from sparcur.paths import Path
from sparcur.backends import PennsieveDatasetData
from sparcur.simple.utils import combinate, multiple, early_failure, DataWrapper
from sparcur.simple.fetch_metadata_files import fetch_prefixes
class ManifestFiles(DataWrapper):
""" wrapper for manifest files. """
def merge_manifests(vs):
""" Merge individual manifest records into the same list """
# FIXME massive hack :/
k = 'manifest_file'
# FIXME errors key ... ? is it allowed up there? it shouldn't be ...
# FIXME {'content': m}
return ManifestFiles([m for v in vs for m in v.data[k]])
def object_from_find_path(glob_prefix, object_from_path_function, glob_type='glob', onfail=None):
""" Return a function that will find files that start with glob_prefix"""
# FIXME should be in utils but depends on fetch_prefixes
if glob_prefix not in dict(fetch_prefixes):
raise ValueError('glob_prefix not in fetch_prefixes! '
f'{glob_prefix!r} not in {fetch_prefixes}')
def func(path, *args, **kwargs):
ds = dat.DatasetStructure(path)
rpath = None
for rpath in ds._abstracted_paths(glob_prefix, sandbox=True):
yield object_from_path_function(rpath, *args, **kwargs)
if rpath is None and onfail is not None:
raise onfail(f'No match for {glob_prefix} in {path.name}')
return func
# file type -> dataset blob key indirection
_TOP = object() # SIGH SIGH SIGH always need a escape hatch
otkm = {ThingFilePath.obj:prefix + '_file' for prefix, ThingFilePath
in dat.DatasetStructure.sections.items()}
otkm[ManifestFiles] = 'manifest_file'
otkm[PennsieveDatasetData] = 'remote_dataset_metadata'
#otkm[type(dat.DatasetStructure(None))] = 'structure' # hack around Pathlib type mangling
#otkm[type(dat.DatasetMetadata(None))] = _TOP
# stream of objects -> place in dataset blob
def dataset_raw(*objects, object_type_key_map=otkm):
data = {}
log.debug(objects)
#path_structure, description, subjects, samples, submission, manifests, *rest = objects
for obj in objects:
log.debug(obj)
key = object_type_key_map[type(obj)]
try:
if key is not _TOP:
data.update({key: obj.data})
else:
data.update(obj.data)
except Exception as e:
# FIXME current things that leak through
# MalformedHeaderError
# something in the objects list is a dict
breakpoint()
pass
return data
# path + version -> python object
# TODO how to attach and validate schemas orthogonally in this setting?
# e.g. so that we can write dataset_1_0_0 dataset_1_2_3 etc.
# we capture version as early as possible in the process, yes we
# could also gather all the files and folders and then pass the version
# in as an argument when we validate their structure, but there are
# elements of the locations or names of those files that might depend
# on the template version, therefore we get maximum flexibility by only
# need to look for the dataset description file
def description(path): return dat.DatasetDescriptionFilePath(path).object
def submission(path, version): return dat.SubmissionFilePath(path).object_new(version)
def subjects(path, version): return dat.SubjectsFilePath(path).object_new(version)
def samples(path, version): return dat.SamplesFilePath(path).object_new(version)
def manifest(path, version): return dat.ManifestFilePath(path).object_new(version)
# dataset path -> python object
def from_path_remote_metadata(path): return PennsieveDatasetData(path.cache)
def from_path_local_metadata(path): return dat.DatasetMetadata(path)
from_path_dataset_description = object_from_find_path('dataset_description', description,
onfail=exc.MissingFileError)
comb_metadata = combinate(
# dataset description is not included here because it is special
# see from_path_dataset_raw for details
from_path_remote_metadata,
from_path_local_metadata,
)
# dataset path + version -> python object
def from_path_dataset_path_structure(path, version): return dat.DatasetStructure(path)
from_path_subjects = object_from_find_path('subjects', subjects)
from_path_samples = object_from_find_path('samples', samples)
from_path_submission = object_from_find_path('submission', submission)
from_path_manifests = multiple(object_from_find_path('manifest', manifest,
'rglob'),
merge_manifests)
# combinate all the individual dataset path + version -> data functions
comb_dataset = combinate(
#from_path_dataset_description, # must be run prior to combination to get version
from_path_dataset_path_structure,
from_path_subjects,
from_path_samples,
from_path_submission,
from_path_manifests,
#from_path_file_metadata, # this must wait until 2nd fetch phase
)
# dataset path -> raw data
def from_path_dataset_raw(dataset_path):
""" Main entry point for getting dataset metadata from a path. """
gen = from_path_dataset_description(dataset_path)
try:
ddo = dataset_description_object = next(gen)
except exc.MissingFileError as e:
# TODO return a stub with embedded error
logd.critical(e)
dataset_blob = dataset_raw(*comb_metadata(dataset_path))
return early_failure(dataset_path, e, dataset_blob)
try:
next(gen)
# TODO return a stub with embedded error
except StopIteration:
pass
data = ddo.data
ddod = type('ddod', tuple(), {'data': data})
dtsv = data['template_schema_version']
return dataset_raw(ddo, *comb_metadata(dataset_path), *comb_dataset(dataset_path, dtsv))
# unused
def from_path_file_metadata(path, _version): # FIXME doesn't go in this file probably
# FIXME this is going to depend on the manifests
# and a second fetch step where we kind of cheat
# and prefetch file files we know we will need
pass
def from_export_path_protocols_io_data(curation_export_json_path): pass
def protocols_io_ids(datasets): pass
def protocols_io_data(protocols_io_ids): pass
def from_group_name_protcur(group_name): pass
def protcur_output(): pass
def summary(datasets, protocols_io_data, protcur_output): pass
def from_path_summary(project_path):
dataset_path_structure
summary((
dataset(
dataset_path_structure,
dataset_description,
subjects,
samples,
submission,
manifests,
*rest
)))
def main(path=Path.cwd(), id=None, dataset_id=tuple(), time_now=None,
export_path=None, export_parent_path=None, _entry_point=False, **kwargs):
# TODO path from dataset_id and retrieve conventions? or just pass path from retrieve final output?
# TODO parallelize if multiple paths
# This assumes that all retrieve operations have
# finished successfully for the current dataset
from sparcur.simple.export import prepare_dataset_export, export_blob
if id is not None: # XXX note that id should be dataset_id # TODO parent_path ??
uuid = id.uuid # FIXME hardcoded backend assumption
path = path / uuid / 'dataset' # FIXME hardcoded see invariant_path
path = path.resolve() # make sure that invariant_path is expanded as we expect.
cache = path.cache
if not cache.is_dataset():
raise TypeError('can only run on a single dataset')
if _entry_point:
if export_parent_path is None:
export_parent_path = prepare_dataset_export(path, export_path)
kwargs.update({'export_path': export_path,
'export_parent_path': export_parent_path,
'time_now': time_now,})
dataset_raw = from_path_dataset_raw(path)
if _entry_point:
export_blob_path = export_blob(dataset_raw, 'ir.json', **kwargs)
return export_blob_path
else:
return dataset_raw
if __name__ == '__main__':
import pprint
from sparcur.simple.utils import pipe_main
pipe_main(main, after=pprint.pprint)
There is not a clean separation between transformation and validation because there are multiple transformation and validation steps that are interleaved.
from pathlib import Path
from sparcur import schemas as sc
from sparcur import pipelines as pipes
from sparcur.core import DictTransformer as DT
def __apply_step(step, spec, data, **kwargs):
return step(data, spec, **kwargs)
def popl(data, pops, source_key_optional=False):
popped = list(DT.pop(data, pops, source_key_optional))
return data
def simple_add(data, adds):
pass
def execute_pipeline(pipeline, data):
for func, *args, kwargs in pipeline:
# man variable arity is a pain to deal with here
# where are lambda lists when you need them :/
# FIXME maybe we can make steps functional instead of mutating?
if not kwargs:
kwargs = {}
func(data, *args, **kwargs)
return data
def __wd(transformer): # not needed atm since transformers do in place modification
def inner(data, *args, **kwargs):
transformer(data, *args, **kwargs)
return data
def schema_validate(data, schema, fail=False, pipeline_stage_name=None):
if isinstance(schema, type):
# we were passed a class so init it
# doing it this way makes it easier to
# use remote schemas that hit the network
# since the network call doesn't have to
# happen at the top level but can mutate
# the class later before we init here
schema = schema()
ok, norm_or_error, data = schema.validate(data)
if not ok:
if fail:
logd.error('schema validation has failed and fail=True')
raise norm_or_error
if 'errors' not in data:
data['errors'] = []
if pipeline_stage_name is None:
pipeline_stage_name = f'Unknown.checked_by.{schema.__class__.__name__}'
data['errors'] += norm_or_error.json(pipeline_stage_name)
# TODO make sure the step is noted even if the schema is the same
simple_moves = (
[['structure', 'dirs',], ['meta', 'dirs']], # FIXME not quite right ...
[['structure', 'files',], ['meta', 'files']],
[['structure', 'size',], ['meta', 'size']],
[['remote_dataset_metadata'], ['inputs', 'remote_dataset_metadata']],
*pipes.SDSPipeline.moves[3:]
)
# function, *args, **kwargs
# functions should accept data as the first argument
core_pipeline = (
# FIXME validation of dataset_raw is not being done right now
(DT.copy, pipes.SDSPipeline.copies, dict(source_key_optional=True)),
(DT.move, simple_moves, dict(source_key_optional=True)),
# TODO clean has custom behavior
(popl, pipes.SDSPipeline.cleans, dict(source_key_optional=True)),
(DT.derive, pipes.SDSPipeline.derives, dict(source_key_optional=True)),
#(DT.add, pipes.SDSPipeline.adds), # FIXME lifter issues
(schema_validate, sc.DatasetOutSchema, None),
# extras (missing)
# xml files
# contributors
# submission
(DT.copy, pipes.PipelineExtras.copies, dict(source_key_optional=True)),
# TODO clean has custom behavior
(DT.update, pipes.PipelineExtras.updates, dict(source_key_optional=True)),
(DT.derive, pipes.PipelineExtras.derives, dict(source_key_optional=True)),
#(DT.add, pipes.PipelineExtras.adds), # TODO and lots of evil custom behavior here
# TODO filter_failures
(schema_validate, sc.DatasetOutSchema, None),
(pipes.PipelineEnd._indexes, None), # FIXME this is conditional and in adds
# TODO pipeline end has a bunch of stuff in here
(schema_validate, sc.PostSchema, dict(fail=True)),
)
def main(path=Path.cwd(), path_dataset_raw=None, dataset_raw=None, **kwargs):
# FIXME TODO need to follow the behavior of main in extract
if dataset_raw is None:
if path_dataset_raw is None:
cache = path.cache
if not cache.is_dataset():
raise TypeError('can only run on a single dataset')
from sparcur.simple.extract import main as extract
dataset_raw = extract(path)
else:
from sparcur.utils import path_ir
dataset_raw = path_ir(path_dataset_raw)
data = execute_pipeline(core_pipeline, dataset_raw)
breakpoint()
if __name__ == '__main__':
from sparcur.simple.utils import pipe_main
pipe_main(main, after=print)
This is a bit out of order since validation is run after an initial export.
# FIXME this is not really doing what we want yet
from sparcur.paths import Path
from sparcur.simple import path_metadata
def from_path_validated_json_metadata(path):
tm = path_metadata.from_path_transitive_metadata(path)
from_blob_validated_json_metadata(tm)
return tm
def from_blob_validated_json_metadata(blob):
""" Mutates in place. """
Path.validate_path_json_metadata(blob)
# perferred
# accepted
# banned
# known
# unknown
def main(_entry_point=False, **kwargs):
# FIXME we want to be able to accept
# --dataset-id, <json-file-to-validate>, and some other things probably?
return path_metadata.main(validate=True, _entry_point=_entry_point, **kwargs)
if export_path:
from_blob_validate_path_json_metadata
else:
from_path_validate_path_json_metadata(path)
if __name__ == '__main__':
#import pprint
from sparcur.simple.utils import pipe_main
pipe_main(main)#, after=pprint.pprint)
import json
from socket import gethostname
import augpathlib as aug
from pyontutils.utils import timeformat_friendly
import sparcur
from sparcur.core import JEncode
from sparcur.paths import Path
from sparcur.utils import loge, symlink_latest
from sparcur.config import auth
def prepare_dataset_export(path, export_path=None): # FIXME do we need export_base?
if export_path is None: # FIXME confusing and breaks w/ convention -> Options maybe?
export_path = Path(auth.get_path('export-path')) # FIXME allow alt?
from sparcur.utils import PennsieveId
identifier = PennsieveId(path.cache.id)
uuid = identifier.uuid
# we don't use cache._fs_safe_id here because we know the
# identifier type from the folder structure
# FIXME dataset metadata export setup basically needs to do all of this first
# set latest run and then latest complete at the end, but complete is kind of arbitrary
# from the atomic point of view
tupdated = path.updated_cache_transitive() # FIXME this causes us to traverse all files twice
# XXX TODO I think that the dataset updated date is now transitive as well? though the dataset
# updated timestamp seems to happen a bit before the created/updated date on the file itself?
# FIXME somehow tupdated can be None !??!?! XXX yes, it happens on empty sparse datasets
export_dataset_folder = export_path / 'datasets' / uuid
export_parent = export_dataset_folder / timeformat_friendly(tupdated)
if not export_parent.exists():
export_parent.mkdir(parents=True)
export_latest_run = export_dataset_folder / 'LATEST_RUN'
symlink_latest(export_parent, export_latest_run)
# FIXME need to symlink to LATEST
return export_parent
def export_blob(blob, blob_file_name, time_now=None,
export_path=None, export_parent_path=None, **kwargs):
if export_parent_path is None:
# NOTE if export_parent_path is not None then it is up to the user
# to make sure that the contents of the dataset directory do not change
# resulting in confusion about mismatched transitive update dates
export_parent_path = prepare_dataset_export(path, export_path)
elif not export_parent_path.exists():
# safe to mkdir here since outside has a record of the path name
export_parent_path.mkdir(parents=True)
export_blob_path = export_parent_path / blob_file_name
add_prov(blob, time_now)
with open(export_blob_path, 'wt') as f:
json.dump(blob, f, indent=2, cls=JEncode)
loge.info(f'path metadata exported to {export_blob_path}')
return export_blob_path
def add_prov(blob, time_now):
""" Mutate blob in place to add prov. """
# FIXME this will klobber cases where prov was embedded by the pipelines
blob['prov'] = {'timestamp_export_start': time_now.START_TIMESTAMP,
'export_system_identifier': Path.sysid,
'export_hostname': gethostname(),
'sparcur_version': sparcur.__version__,
'sparcur_internal_version': sparcur.__internal_version__,
}
rp = aug.RepoPath(sparcur.core.__file__)
if rp.working_dir is not None:
blob['prov']['sparcur_commit'] = rp.repo.active_branch.commit.hexsha
def main(path=Path.cwd(), export_path=None, **kwargs):
return prepare_dataset_export(path, export_path=export_path)
if __name__ == '__main__':
import pprint
from sparcur.simple.utils import pipe_main
pipe_main(main, after=pprint.pprint)
Usage example
find -maxdepth 1 -type d -not -path '.operations*' -not -path '.' -exec python -m sparcur.simple.path_metadata {} \;
xargs
variant.
find -maxdepth 1 -type d -not -path '.operations*' -not -path '.' -print0 | \
xargs -0 -I{} -P8 -r -n 1 python -m sparcur.simple.path_metadata {}
Alternate example
python -m sparcur.simple.path_metadata \
21957eae-0824-4fb5-b18f-04d6ed12ce18/dataset \
--export-parent-path 21957eae-0824-4fb5-b18f-04d6ed12ce18/exports/
from pathlib import Path
def from_path_transitive_metadata(path):
tml = path._transitive_metadata()
# FIXME TODO handle sparse cases
return {'type': 'path-metadata',
'data': tml,}
def main(path=Path.cwd(), id=None, time_now=None,
parent_path=None, invariant_local_path='dataset',
parent_parent_path=Path.cwd(),
export_local=False, export_path=None, export_parent_path=None,
_entry_point=False, validate=False, **kwargs):
from sparcur.paths import Path
from sparcur.simple.export import prepare_dataset_export, export_blob
if path == Path.cwd() and (id is not None or parent_path is not None):
if parent_path is None:
uuid = id.uuid
parent_path = parent_parent_path / uuid
invariant_path = parent_path / invariant_local_path
path = invariant_path.expanduser().resolve()
else:
parent_parent_path = None
# TODO path from dataset_id and retrieve conventions? or just pass path from retrieve final output?
# TODO parallelize if multiple paths
# This assumes that all retrieve operations have
# finished successfully for the current dataset
# FIXME Options calls resolve on all paths but not if Path.cwd slips through ...
path = Path(path) # FIXME even here some paths don't have caches ?!
cache = path.cache # XXX this should have errored when Path was applied below !?!?!??! pipe_main wat ???
if not cache.is_dataset():
raise TypeError('can only run on a single dataset')
if _entry_point:
if export_parent_path is None:
export_parent_path = prepare_dataset_export(path, export_path)
kwargs.update({'export_path': export_path,
'export_parent_path': export_parent_path,
'time_now': time_now,})
tm = from_path_transitive_metadata(path)
# FIXME TODO validate file formats, which means this also needs to support the remoteless case
# FIXME TODO process metadata for each timepoint when things enter should go in prov I think
# or we need to be collecting prov along the way, we don't have an overseer or conductor
# so we can't keep everything embedded
# FIXME TODO embed the transitive cache updated value that is used in prepare_dataset_export
if validate: # FIXME raw vs validated and FIXME pipeline
from sparcur import schemas as sc
from sparcur.simple.transform import schema_validate
Path.validate_path_json_metadata(tm)
schema_validate(tm, sc.PathTransitiveMetadataSchema)
if _entry_point:
export_blob_path = export_blob(tm, 'path-metadata.json', **kwargs) # FIXME naming for raw vs validated
return export_blob_path
else:
return tm
if __name__ == '__main__':
#import pprint
from sparcur.simple.utils import pipe_main
pipe_main(main)#, after=pprint.pprint)
This is equivalent to creating objects that match the summary schema for legacy organization level curation-export.json files.
FIXME it might be the case that individual datasets were exported under different schemas, which means that there would no longer be a single consistent schema for checking the merge, this is why we sometimes need to rerun all datasets when there is a schema change even if ther is not a schema change it very well may be the case that indvidiual datasets will have been run on different versions of the export pipeline, and slightly different images
import json
import rdflib
from pathlib import Path
from dateutil import parser as dateparser
from pyontutils.core import OntResPath
from pyontutils.utils import TZLOCAL, timeformat_friendly
from pyontutils.namespaces import TEMP, rdf, sparc
#from sparcur.utils import fromJson, register_all_types # FIXME this should also go in sparcron
from sparcur.export.triples import TriplesExportSummary
from sparcur.export.published import _merge_graphs
from sparcur.simple.export import add_prov
tu = 'timestamp_updated'
tuc = 'timestamp_updated_contents'
ip = 'inputs'
rmk = 'remote_dataset_metadata'
class TriplesExportSummaryPublished(TriplesExportSummary):
@property
def ontid(self):
return rdflib.URIRef(super().ontid.replace('ontologies/', 'ontologies/published/'))
@property
def header_label(self):
return rdflib.Literal(f'{self.folder_name} curation export published graph')
def max_dataset_or_contents_updated(datasets_list):
return max(set.union(
set([a['meta'][tuc] for a in datasets_list
if tuc in a['meta'] and a['meta'][tuc]]),
set([a['meta'][tu] for a in datasets_list
if tu in a['meta'] and a['meta'][tu]])))
def from_dataset_export_path_snapshot(dataset_export_path, snapshots_path, time_now):
derefs = [l.resolve() for l in [c / 'LATEST' for c in dataset_export_path.children] if l.exists()]
snapshot_path = snapshots_path / time_now.START_TIMESTAMP_LOCAL_FRIENDLY
snapshot_path.mkdir(parents=True)
[(snapshot_path / d.parts[-2]).symlink_to((snapshot_path / 'asdf').relative_path_to(d)) for d in derefs]
return snapshot_path
def from_snapshot_path_datasets_lists(snapshot_path):
alld = []
pubd = []
for uuid_path in snapshot_path.children:
with open(uuid_path / 'curation-export.json', 'rt') as f:
j = json.load(f)
# TODO validate the load XXX this should probably
# happen as a final roundtrip check during export
# TODO filter by organization
alld.append(j)
if ip in j and rmk in j[ip] and 'id_published' in j[ip][rmk]:
pubd.append(j)
return alld, pubd
def from_snapshot_path_summary_json(snapshot_path, project_id, time_now):
l_all, l_pub = from_snapshot_path_datasets_lists(snapshot_path)
sum_all = from_datasets_list_summary_json(l_all, project_id, time_now)
sum_pub = from_datasets_list_summary_json(l_pub, project_id, time_now)
return sum_all, sum_pub
def from_snapshot_path_summary_ttl(snapshot_path, project_id, time_now, blob):
tes = TriplesExportSummary(blob)
tesp = TriplesExportSummaryPublished(blob)
orps = [OntResPath(uuid_path / 'curation-export.ttl')
for uuid_path in sorted(snapshot_path.children, key=lambda p: p.name)]
graphs_all = [o.graph for o in orps]
graphs_pub = [
g for g, uripub in [(g, list(g[ds:TEMP.hasUriPublished]))
for g in graphs_all
for ds in g[:rdf.type:sparc.Dataset]]
if uripub]
merged_all = _merge_graphs(graphs_all)
merged_all.populate_from_triples(tes.triples_header)
merged_pub = _merge_graphs(graphs_pub)
merged_pub.populate_from_triples(tesp.triples_header)
return merged_all, merged_pub, graphs_all, graphs_pub
def from_snapshot_path_summary_ttl_BAD(snapshot_path, project_id, time_now, blob):
# this variant is too complex, trying to reuse the published graph as the all graph
# and the implementation of the OntConjunctiveGraph is not far enough along to do it
tes = TriplesExportSummary(blob) # FIXME nasty annoying dep
graph_meta = OntGraph()
graph_meta.populate_from_triples(tes._triples_header(tes.ontid, time_now._start_time))
rev_replace_pairs = _fix_for_pub(graph_meta, graph_meta)
replace_pairs = tuple([(b, a) for a, b in rev_replace_pairs])
orps = [OntResPath(uuid_path / 'curation_export.ttl')
for uuid_path in snapshot_path.children]
graphs = [o.graph for o in orps]
# FIXME should use id_published here as well, but that isn't being
# propagated at the moment
graphs_pub = []
graphs_nop = []
for g, uripub in [(g, list(g[ds:TEMP.hasUriPublished]))
for g in graphs
for ds in g[:rdf.type:sparc.Dataset]]:
if uripub:
graphs_pub.append(g)
else:
graphs_nop.add(g)
graph_pub = _merge_graphs(published_graphs)
graph_pub.populate_from(graph_meta)
# FIXME this is manually aligned with TriplesExport.triples_header
graph_pub.asdf
for g in graphs_nop:
graph_pub.namespace_manager.populate_from(
{k:v for k, v in dict(g.namespace_manager).items()
if k not in ('contributor', 'sample', 'subject')})
ttl_all = None
ttl_pub = _populate_published(curation_export, graph)
def from_dataset_export_path_datasets_lists(dataset_export_path):
dep = dataset_export_path
alld = []
pubd = []
derefs = [l.resolve() for l in [c / 'LATEST' for c in dep.children] if l.exists()]
# TODO consider creating a folder that is just symlinks before this
for lp in sorted(derefs, key=lambda p: p.name):
with open(lp / 'curation-export.json', 'rt') as f:
j = json.load(f)
# TODO validate the load XXX this should probably
# happen as a final roundtrip check during export
# TODO filter by organization
alld.append(j)
if ip in j and rmk in j[ip] and 'id_published' in j[ip][rmk]:
pubd.append(j)
return alld, pubd
def from_datasets_list_summary_json(datasets_list, project_id, time_now):
# XXX FIXME issue with datasets from multiple projects
fn = Path(datasets_list[0]['prov']['export_project_path']).name
out = {
'id': project_id.id,
'meta': {
'count': len(datasets_list),
'folder_name': fn, # WHAT A RELIEF we don't need network
'uri_api': project_id.uri_api,
'uri_human': project_id.uri_human(),
},
'datasets': datasets_list,
}
add_prov(out, time_now)
# some potential prov summaries, but lets not do that here
# most prov stats should be on the single dataset level
#'export_timestamp_start_min': min(tes),
#'export_timestamp_start_max': max(tes),
return out
def from_dataset_export_path_summary_json(dataset_export_path, project_id, time_now):
l_all, l_pub = from_dataset_export_path_datasets_lists(dataset_export_path)
#[a['meta']['timestamp_updated'] < a['meta']['timestamp_updated_contents']
#for a in l_all if a['meta']['timestamp_updated_contents']]
sum_all = from_datasets_list_summary_json(l_all, project_id, time_now)
sum_pub = from_datasets_list_summary_json(l_pub, project_id, time_now)
return sum_all, sum_pub
def main(project_id=None, export_path=None, time_now=None,
project_id_auth_var='remote-organization', # FIXME move to clifun
disco=False, **kwargs):
from sparcur.paths import Path
from sparcur.config import auth
from sparcur.core import OntTerm
OntTerm._nofetch = True
if project_id is None:
from sparcur.config import auth
from sparcur.utils import PennsieveId
project_id = auth.get(project_id_auth_var)
project_id = PennsieveId(project_id) # FIXME abstract the hardcoded backend
if export_path is None: # XXX see issues mentioned above
export_path = Path(auth.get_path('export-path'))
dataset_export_path = export_path / 'datasets'
snapshots_path = export_path / 'snapshots'
snapshot_path = from_dataset_export_path_snapshot(
dataset_export_path, snapshots_path, time_now)
sum_all, sum_pub = from_snapshot_path_summary_json(
snapshot_path, project_id, time_now)
# write symlink LATEST_PARTIAL
ttl_all, ttl_pub, graphs_all, graphs_pub = from_snapshot_path_summary_ttl(
snapshot_path, project_id, time_now, sum_all)
# write symlink LATEST
maxdt = max_dataset_or_contents_updated(sum_all['datasets'])
dt_maxdt = dateparser.parse(maxdt)
dt_maxdt_local = dt_maxdt.astimezone(TZLOCAL())
friendly_maxdt_local = timeformat_friendly(dt_maxdt_local)
# FIXME there are some bad assumptions in here that should be refactored out
# at some point, but for now we implicitly assume that all datasets come from
# the same organization, which can easily be violated because we don't check
# however the existing internal schema requires an id for the summary which is
# currenty the organization id
# FIXME summary is a hardcoded path
# XXX WARNING it is possible to overwrite since maxdt might not change between runs
# this is desirable behavior for development, but could cause issues in other cases
pexpath = export_path / 'summary' / project_id.uuid
latest = pexpath / 'LATEST'
npath = pexpath / friendly_maxdt_local
snapshot_link = npath / 'snapshot'
if not npath.exists():
npath.mkdir(parents=True)
else:
# FIXME not sure if this makes sense?
if snapshot_link.is_symlink():
snapshot_link.unlink()
snapshot_link.symlink_to(snapshot_link.relative_path_to(snapshot_path))
npath_ce = npath / 'curation-export.json'
npath_cep = npath / 'curation-export-published.json'
for path, blob in ((npath_ce, sum_all),
(npath_cep, sum_pub)):
with open(path, 'wt') as f:
json.dump(blob, f, indent=2)
npath_ttl = npath / 'curation-export.ttl'
npath_ttlp = npath / 'curation-export-published.ttl'
ttl_all.write(npath_ttl)
ttl_pub.write(npath_ttlp)
if disco:
# export xml and tsv for disco
from sparcur.simple.disco import from_curation_export_json_path_datasets_json_xml_and_disco
from_curation_export_json_path_datasets_json_xml_and_disco(
npath_ce, sum_all['datasets'], graphs_all)
if latest.is_symlink():
latest.unlink()
latest.symlink_to(friendly_maxdt_local)
return npath_ce, sum_all, sum_pub, graphs_all, graphs_pub
if __name__ == '__main__':
#import pprint
from sparcur.simple.utils import pipe_main
# these are really big, don't print them
# pipe_main(main, after=pprint.pprint)
pipe_main(main)
Legacy export for disco.
import json
from pyontutils.core import OntGraph
from sparcur import export as ex
from sparcur.utils import PennsieveId
from sparcur.utils import fromJson, register_all_types # FIXME this should also go in sparcron
def from_curation_export_json_path_xml_and_disco(curation_export_json_path):
with open(curation_export_json_path, 'rt') as f:
summary = json.load(f)
datasets_json = summary['datasets']
from_curation_export_json_path_datasets_json_xml_and_disco(
curation_export_json_path, datasets_json)
def from_curation_export_json_path_datasets_json_xml_and_disco(
curation_export_json_path, datasets_json, graphs=None):
# FIXME need the snapshot linked somehow, export time started if we are using summary
# or summary prov timestamp_export_start will give us the snapshot path as well if we
# parse it back to a date
if not graphs:
snapshot_path = curation_export_json_path.parent / 'snapshot'
paths = [(snapshot_path /
PennsieveId(d['id']).uuid /
'curation-export.ttl')
for d in datasets_json]
graphs = [OntGraph().parse(path) for path in paths]
datasets_ir = fromJson(datasets_json)
ex.export_xml(curation_export_json_path, datasets_ir)
ex.export_disco(curation_export_json_path, datasets_ir, graphs)
# XXX not doing jsonld here, it will be combined
# from single dataset jsonld or similar
def main(path=None, **kwargs):
register_all_types()
# FIXME the problem with this approach is that can cannot run
# multiple downstream steps from the same upstream step, we would
# need a compositional way to tell each downstream which upstreams
# we wanted to run in any given situation, all to save additional
# reads from disk
if path is None: # assume the user wants to run combine first
from sparcur.simple.combine import main as combine
curation_export_json_path, summary_all, _, graphs_all, _ = combine(**kwargs)
datasets_json = summary_all['datasets']
from_curation_export_json_path_datasets_json_xml_and_disco(
curation_export_json_path, datasets_json, graphs)
else:
curation_export_json_path = path
from_curation_export_json_path_xml_and_disco(curation_export_json_path)
if __name__ == '__main__':
#import pprint
from sparcur.simple.utils import pipe_main
# these are really big, don't print them
# pipe_main(main, after=pprint.pprint)
pipe_main(main)
find all export dataset folders that are older than the current snapshot and are not in a snapshot that is in a summary compact them
"""
Examples:
# look at the second one and then run the first one since it is easier and safer to stop and resume
# XZ_OPT=-e9 python -m sparcur.simple.compact | xargs -P4 -r -I{} sh -c 'tar -cvJf "{}.tar.xz" "{}" && rm -r "{}"'
python -m sparcur.simple.compact | xargs -P12 -r -I{} echo tar -cvJf '{}.tar.xz' '{}'
# python -m sparcur.simple.compact | xargs -P6 -r -I{} echo rm -r '{}'
"""
from sparcur.paths import Path
from sparcur.config import auth
__dep_cache = {}
def latest_snapped(dataset_export_path, snapped):
if dataset_export_path not in __dep_cache:
cs = set(c for c in dataset_export_path.children if c.is_dir() and not c.is_symlink())
csnap = cs.intersection(snapped)
if not csnap: # no snap, pretend that latest is snapped
# this can happen if there is no LATEST because we
# were e.g. just exporting path metadata and nothing else
maxsnap = sorted(cs, key=lambda p: p.name)[-1]
else:
maxsnap = sorted(csnap, key=lambda p: p.name)[-1]
__dep_cache[dataset_export_path] = maxsnap.name
return __dep_cache[dataset_export_path]
def main():
export_path = Path(auth.get_path('export-path'))
summary_path = export_path / 'summary'
snapshots_path = export_path / 'snapshots'
datasets_path = export_path / 'datasets'
snap_shotted = [
dss.resolve()
for d in summary_path.rchildren_dirs
for l in d.rchildren if l.is_symlink() and l.name == 'snapshot'
for dss in l.resolve().children]
snapped = set(snap_shotted)
latest_sums = [
d.resolve()
for c in summary_path.children
for d in (c / 'LATEST',) if d.exists()]
all_builds = [
build
for date in datasets_path.children if date.is_dir() and not date.is_symlink()
for build in date.children if build.is_dir() and not build.is_symlink()]
older_not_snap = [
a for a in all_builds
if a not in snapped and a.name < latest_snapped(a.parent, snapped)]
assert not snapped.intersection(older_not_snap)
# newer = set(all_builds) - snapped - set(older_not_snap)
_ = [print(p) for p in older_not_snap]
if __name__ == '__main__':
main()
from sparcur.config import auth
__doc__ = f"""Common command line options for all sparcur.simple modules
Usage:
sparcur-simple manifest [options] [<path>...]
sparcur-simple get-uuid <remote-id>...
sparcur-simple datasets
sparcur-simple for-racket
sparcur-simple for-racket (meta|diff) [options] [<path> ...]
sparcur-simple for-racket make-push-manifest <dataset-id> <updated> <push-id> [options] [<path> ...]
sparcur-simple for-racket push <dataset-id> <updated> <push-id> [options] [<path> ...]
sparcur-simple check-names
sparcur-simple git-repos [update] [options]
sparcur-simple [options] [<path>...]
sparcur-simple [options] [--dataset-id=<ID>...]
sparcur-simple [options] [--extension=<EXT>...] [<path>...]
Commands:
manifest generate manifest files for path
for-racket print data for reading into Racket
Options:
-h --help show this
--hypothesis-group-name=NAME the hypotheis group name
--project-id=ID the project id
--dataset-id=<ID>... one or more datset ids
--project-id-auth-var=VAR name of the auth variable holding the project-id
--all-projects run for all projects listed in config remote-organizations
--project-path=PATH the project path, will be path if <path>... is empty
--parent-path=PATH the parent path where the project will be cloned to
--parent-parent-path=PATH parent in which a random tempdir is generated
or the dataset uuid is used as the parent path, don't use this ...
--invariant-local-path=PATH path relative to parent path for dataset [default: dataset]
--export-local set export-parent-path to {{:parent-path}}/exports/
--export-path=PATH base export path containing the standard path structure [default: {auth.get_path('export-path')}]
--export-parent-path=PATH direct parent path into which exports will be placed
--for-template=TEMPLATE run regularization steps when generating templates
--cleaned-path=PATH base cleaned path containing the standard path structure [default: {auth.get_path('cleaned-path')}]
--cleaned-output-path=PATH directory path into which cleaned paths will be placed
--extension=<EXT>... one or more file extensions to fetch
-j --jobs=N number joblib jobs [default: 12]
--exclude-uploaded do not pull files from remote marked as uploaded
--sparse-limit=N package count that forces a sparse pull [default: {auth.get('sparse-limit')}]
--no-index do not generate index files e.g. on pull
--symlink-objects-to=PATH path to an existing objects directory
--log-level=LEVEL log level info [default: INFO]
--open=PROGRAM show file with PROGRAM
--show show file with xopen
--pretend show what would be done for update
"""
# XXX FIXME --dataset-id=<ID>... breaks the use of [options] ???
import os
import sys
from types import GeneratorType
from pyontutils import clifun as clif
from sparcur import exceptions as exc
from sparcur.utils import _find_command
from sparcur.utils import log, logd, loge, GetTimeNow, PennsieveId
from sparcur.paths import Path, PennsieveCache
from sparcur.backends import PennsieveRemote
def backend_pennsieve(project_id=None, Local=Path, Cache=PennsieveCache): # (ref:def_babf)
"""return a configured pennsieve backend
calling this is sufficient to get everything set up correclty
You must call RemotePath.init(project_id) before using
RemotePath. Passing the project_id argument to this function
will do that for you. It is not required because there are
cases where the project_id may not be known at the time that
this function is called. """
RemotePath = PennsieveRemote._new(Local, Cache)
if project_id is not None:
RemotePath.init(project_id)
return RemotePath
class Options(clif.Options):
@property
def id(self):
# dataset_id has priority since project_id can occure without a
# dataset_id, but dataset_id may sometimes come with a project_id
# in which case the dataset_id needs priority for functions that
# branch on the most granular identifier type provided
return (self.dataset_id[0]
if self.dataset_id else
(self.project_id
if self.project_id else
None))
@property
def project_id(self):
if not hasattr(self, '_cache_project_id'):
id = self._args['--project-id']
if id is not None:
identifier = PennsieveId(id, type='organization')
self._cache_project_id = identifier
else:
return
return self._cache_project_id
@property
def dataset_id(self):
if not hasattr(self, '_cache_dataset_id'):
ids = self._args['--dataset-id']
positional = self._args['<dataset-id>']
if ids:
identifiers = [PennsieveId(id, type='dataset') for id in ids]
self._cache_dataset_id = identifiers
elif positional:
return [PennsieveId(positional, type='dataset')]
else:
return ids
return self._cache_dataset_id
@property
def remote_id(self):
if not hasattr(self, '_cache_remote_id'):
ids = self._args['<remote-id>']
if ids:
identifiers = [PennsieveId(id) for id in ids]
self._cache_remote_id = identifiers
else:
return ids
return self._cache_remote_id
@property
def jobs(self):
return int(self._args['--jobs'])
n_jobs = jobs # match internal kwargs conventions
@property
def paths(self):
return [Path(p).expanduser().resolve() for p in self._args['<path>']]
@property
def path(self):
paths = self.paths
if paths:
return paths[0]
elif self.project_path:
return self.project_path
else:
# if no paths were listed default to cwd
# consistent with how the default kwargs
# are set on a number of mains
# this is preferable to allow path=None
# to be overwritten by the conventions of
# individual pipeline mains
return Path.cwd()
@property
def project_path(self):
pp = self._args['--project-path']
if pp:
return Path(pp).expanduser().resolve()
@property
def parent_parent_path(self):
ppp = self._args['--parent-parent-path']
if ppp:
return Path(ppp).expanduser().resolve()
else:
return Path.cwd()
@property
def parent_path(self):
pap = self._args['--parent-path']
did = self.dataset_id
if pap:
return Path(pap).expanduser().resolve()
elif did:
id = self.id # dataset_id is a list so use id which handles that
uuid = id.uuid
return (self.parent_parent_path / uuid).expanduser().resolve()
@property
def export_path(self):
ep = self._args['--export-path']
epp = self.export_parent_path
if ep and epp:
raise TypeError('Only one of --export-path and --export-parent-path may be set.')
elif ep:
return Path(ep).expanduser().resolve()
else:
raise Exception('should not happen')
@property
def export_parent_path(self):
epp = self._args['--export-parent-path']
el = self.export_local
pap = self.parent_path
if epp and el:
raise TypeError('Only one of --export-local and --export-parent-path may be set.')
elif epp:
return Path(epp).expanduser().resolve()
elif el and pap:
# it is ok to do this here becuase the TypeError above prevents
# the case where both epp and el are set, so even though epp
# is no longer always what was set on the command line, it is
# it is the case that there won't be conflicting sources
return pap / 'exports'
@property
def cleaned_path(self):
cp = self._args['--cleaned-path']
cop = self.cleaned_output_path
if cp and cop and cp != self._defaults['--cleaned-path']:
# XXX NOTE if the default path is passed as the cleaned path this will not error
# that is the cost of showing the user the default value in cli, we can't tell
# whether the user set the option in that case, probably a good tradeoff
# the alternative is to detect that the value is None and set it later, which
# is done routinely here, but is somewhat opaque
raise TypeError('Only one of --cleaned-path and --cleaned-output-path may be set.')
elif cp:
return Path(cp).expanduser().resolve()
else:
raise Exception('should not happen')
@property
def cleaned_output_path(self):
cop = self._args['--cleaned-output-path']
if cop:
return Path(cop).expanduser().resolve()
@property
def extensions(self):
return self.extension
@property
def symlink_objects_to(self):
slot = self._args['--symlink-objects-to']
if slot:
return Path(slot).expanduser()
@property
def sparse_limit(self): # FIXME not being pulled in by asKwargs ??
return int(self._args['--sparse-limit'])
@property
def time_now(self): # FIXME make it possible to pass this in?
if not hasattr(self, '_time_now'):
self._time_now = GetTimeNow()
return self._time_now
@property
def log_level(self): # FIXME not being pulled in by asKwargs ??
ll = self._args['--log-level']
if ll.isdigit() or ll[0] == '-' and ll[1:].isdigit():
return int(ll)
else:
return ll
def pipe_main(main, after=None, argv=None):
options, args, defaults = Options.setup(__doc__, argv=argv)
# _entry_point is used as a way to switch behavior when a
# pipeline main is run directly or actually in a pipeline
try:
log.setLevel(options.log_level)
logd.setLevel(options.log_level)
loge.setLevel(options.log_level)
out = main(_entry_point=True, **options.asKwargs())
except Exception as e:
log.exception(e)
log.error(options.path)
raise e
if after:
after(out)
return out
def rglob(path, pattern):
""" Hack around the absurd slowness of python's rglob """
if sys.platform == 'win32':
log.warning('no findutils on windows, watch out for unexpected files')
return list(path.rglob(pattern))
doig = (hasattr(path, 'cache') and
path.cache and
path.cache.cache_ignore)
exclude = ' '.join([f"-not -path './{p}*'" for p in path.cache.cache_ignore]) if doig else ''
command = f"""{_find_command} {exclude} -name {pattern!r}"""
with path:
with os.popen(command) as p:
string = p.read()
path_strings = string.split('\n') # XXX posix path names can contain newlines
paths = [path / s for s in path_strings if s]
return paths
def _fetch(cache): # sigh joblib multiprocessing pickling
# lambda functions are great right up until you have to handle an
# error function inside of them ... thanks python for yet another
# failure to be homogenous >_<
meta = cache.meta
try:
size_mb = meta.size.mb
except AttributeError as e:
if meta.errors:
logd.debug(f'remote errors {meta.errors} for {cache!r}')
return
else:
raise exc.StopTheWorld(cache) from e
return cache.fetch(size_limit_mb=None) # FIXME somehow this is not working !?
def _fetch_path(path): # sigh joblib multiprocessing pickling
path = Path(path)
if path.is_dir():
msg = f'trying to fetch a directory: {path}'
log.error(msg)
return
cache = path.cache
if path.exists() and path.size == cache.meta.size:
return # already got this one
if cache is None:
raise exc.NoCachedMetadataError(path)
# do not return to avoid cost of serialization back to the control process
_fetch(cache)
def fetch_paths_parallel(paths, n_jobs=12, use_async=True):
if n_jobs <= 1:
[_fetch_path(path) for path in paths]
elif use_async:
from pyontutils.utils import Async, deferred
Async()(deferred(_fetch_path)(path) for path in paths)
else:
import pathlib
from joblib import Parallel, delayed
backend = 'multiprocessing' if hasattr(sys, 'pypy_version_info') else None
# FIXME FIXME FIXME somehow this can result in samples.xlsx being fetched to subjects.xlsx !?!??!!
# XXX either a race condition on our end or something insane from the remote
Parallel(n_jobs=n_jobs, backend=backend)(delayed(_fetch_path)(pathlib.Path(path)) for path in paths)
#Parallel(n_jobs=n_jobs)(delayed(fetch_path)(path) for path in paths)
def combinate(*functions):
def combinator(*args, **kwargs):
for f in functions:
# NOTE no error handling is needed here
# in no cases should the construction of
# the python object version of a path fail
# all failures should happen _after_ construction
# the way we have implemented this they fail when
# the data attribute is accessed
obj = f(*args, **kwargs)
if isinstance(obj, GeneratorType):
yield from obj
# FIXME last one wins, vs yield tuple vs ...?
# FIXME manifests are completely broken for this
else:
yield obj
return combinator
def multiple(func, merge=None):
""" combine multiple results """
def express(*args, **kwargs):
vs = tuple(func(*args, **kwargs))
if merge is not None:
yield merge(vs)
else:
yield vs
return express
def early_failure(path, error, dataset_blob=None):
# these are the 9999 5555 and 4444 errors
# TODO match the minimal schema reqs as
# we did in pipelines
if dataset_blob is None:
cache = path.cache
return {'id': cache.id,
'meta': {'uri_api': cache.uri_api,
'uri_human': cache.uri_human,},
#'status': 'early_failure', # XXX note that status is not requried
# if we cannot compute it, but if there are inputs there should be
# a status
'errors': [error], # FIXME format errro
}
else:
if 'errors' not in datset_blob:
dataset_blob['errors'] = []
datset_blob['errors'].append(error)
return dataset_blob
class DataWrapper:
# sigh patterns are stupid, move this to elsewhere so it doesn't taint everything
def __init__(self, data):
self.data = data
def main(id=None, **kwargs):
def ik(key):
return key in kwargs and kwargs[key]
if id is not None:
print(id.uuid)
if ik('get_uuid'):
for id in kwargs['remote_id']:
print(id.uuid)
return
if (ik('datasets') or
(ik('for_racket') and not (ik('diff') or ik('make_push_manifest') or ik('push'))) or
ik('check_names')):
log.setLevel(60) # logging.CRITICAL = 50
from sparcur.config import auth
from sparcur.simple.utils import backend_pennsieve
if ik('project_id'):
pass # project_id from command line
else:
project_id = auth.get('remote-organization')
project_ids = auth.get_list('remote-organizations')
if not project_ids:
project_ids = [project_id]
datasets = []
project_meta = []
for project_id in project_ids:
PennsieveRemote = backend_pennsieve(project_id)
root = PennsieveRemote(project_id)
project_meta.append((root.id, root.name))
datasets.extend(root.children)
if ik('datasets'):
print('\n'.join([d.id for d in datasets]))
if ik('for_racket'):
import json
from sxpyr import sxpyr
from augpathlib import meta as apmeta
from pyontutils.utils_fast import isoformat
from dateutil import parser as dateparser
if ik('meta'):
paths = kwargs['paths'] if kwargs['paths'] else [kwargs['path']]
for path in paths:
cmeta = Path(path).cache_meta
out = '('
rest = False
ebf = apmeta._EncodeByField()
for field in apmeta._PathMetaAsXattrs.fields:
v = getattr(cmeta, field)
if v or v == 0:
if rest:
out += '\n '
else:
rest = True
if field == 'checksum':
v = v.hex() # bytes are annoying to port
elif field in ('updated', 'created'):
v = ebf._datetime(v)
out += f':{field} {json.dumps(v)}'
out += ')'
print(out)
elif ik('diff'): # FIXME own file?
# FIXME TODO based on dataset-id
path = kwargs['path']
dataset_id, updated_transitive, diff = path.diff()
blob = {
'dataset-id': dataset_id.id,
'updated-transitive': isoformat(updated_transitive),
'diff': diff,
}
pl = sxpyr.python_to_sxpr(blob, str_as_string=True)
sxpr = pl._print(sxpyr.configure_print_plist(newline_keyword=False))
print(sxpr)
elif ik('make_push_manifest'): # FIXME own file?
# this is separate from push because checking confim is a separate action from pushing
dataset_id = id
updated = dateparser.parse(kwargs['updated'])
push_id = kwargs['push_id']
path = kwargs['path']
path.make_push_manifest(dataset_id, updated, push_id)
log.info(f'make push manifest finished for {dataset_id}')
# read list of files + expected ops to change
# generate the full change manifest
elif ik('push'): # FIXME own file?
dataset_id = id
updated = dateparser.parse(kwargs['updated'])
push_id = kwargs['push_id']
path = kwargs['path']
# TODO error handling and retry or resume
path.push_from_manifest(dataset_id, updated, push_id)
log.info(f'push from manifest finished for {dataset_id}')
# FIXME TODO consider pulling incremental index entries
# for only the paths in the manifest so that the diff
# doesn't go stale, also need to pull the latest remote
# metadata for the updated files
else:
key = lambda d: d.updated
_dsmeta = '\n'.join([("("
f"{json.dumps(d.id)} "
f"{json.dumps(d.name)} "
f"{json.dumps(d.updated)} "
f"{json.dumps(d.owner.name_or_email)} "
f"{json.dumps(d.parent_id)} "
f"{json.dumps(d.bfobject.publication_status)}"
")")
for d in sorted(datasets, key=key, reverse=True)])
dsmeta = f"({_dsmeta})"
_projmeta = '\n'.join([f"({json.dumps(id)} {json.dumps(name)})" for id, name in project_meta])
dsmeta += f"\n({_projmeta})"
print(dsmeta)
if ik('check_names'):
# you will want to run sparcur.simple.fetch_remote_metadata_all
from pathlib import PurePosixPath
def report(pid, exp, pub):
pname = pub['name']
name_mismatch = (
False if exp['basename'] == pname
else (exp['basename'], pname))
# [6:] to strip files/
ppname = PurePosixPath(pub['path']).name
pathname_mismatch = (
False if exp['basename'] == ppname
else (exp['basename'], ppname))
eppp = PurePosixPath(exp['dataset_relative_path']).parent.as_posix()
pppp = PurePosixPath(pub['path'][6:]).parent.as_posix()
parent_path_mismatch = (
False if eppp == pppp
else (eppp, pppp))
# once we fix things on our end names should match
# parent paths should match
# name and pathname might match but do not have to match
pid = f' {pid}'
nsp = '\n '
if name_mismatch:
if pathname_mismatch and pname != ppname:
return (f'{pid} name mismatch and pathname mismatch '
f'{nsp}{nsp.join(name_mismatch)}{nsp}{nsp.join(pathname_mismatch)}')
else:
return (f'{pid} name mismatch '
f'{nsp}{nsp.join(name_mismatch)}')
if parent_path_mismatch:
if True:
return (f'{pid} parent mismatch'
f'{nsp}{nsp.join(parent_path_mismatch)}')
if True:
if True:
return ''
#return (f'{pid} ok ')
import json
from sparcur.backends import PennsieveDatasetData
export_path = kwargs['export_path']
epd = export_path / 'datasets'
for dataset in datasets:
latest = epd / dataset.identifier.uuid / 'LATEST'
export_path_metadata = latest / 'path-metadata.json'
exported = export_path_metadata.exists()
# pass the remote not just the id so that bfobject is
# accessible to the RemoteDatasetData class
pdd = PennsieveDatasetData(dataset)
rmeta = pdd.fromCache()
published = 'id_published' in rmeta
if exported and published:
with open(export_path_metadata, 'rt') as f:
j = json.load(f)
epni = {'N:' + o['remote_id']:o for o in j['data']
if o['remote_id'].startswith('package:')}
ppni = pdd._published_package_name_index()
se, sp = set(epni), set(ppni)
e_missing = sp - se
p_missing = se - sp
s_common = sp & se
rep = [report(c, epni[c], ppni[c]) for c in s_common]
repstr = '\n'.join([r for r in rep if r])
if repstr:
print(dataset.id, 'bad')
print(repstr)
else:
print(dataset.id, 'ok')
elif exported:
print(dataset.id, 'unpublished')
elif published:
print(dataset.id, 'unexported')
else:
print(dataset.id, 'unpublished and unexported')
if ik('git_repos'):
import augpathlib as aug
import sparcur
from sparcur.config import auth
from importlib import metadata
d = metadata.distribution(sparcur.__package__)
rps = [p for p in [aug.RepoPath(d._path) for d in d.discover()] if p.working_dir]
setups = [p for p in [p.parent / 'setup.py' for p in rps] if p.exists()]
wds = sorted(set([p.working_dir for p in rps]))
never_update = auth.get('never-update')
pretend=ik('pretend') or never_update
if pretend:
if never_update:
print(f'never-update: true is set in {auth.user_config._path}')
print('These are the commands that would be run.')
def doupdate(rp):
if pretend:
print(f'git -C {rp.as_posix()} stash')
print(f'git -C {rp.as_posix()} pull')
return
print(f'Pulling {rp.as_posix()}')
print(rp.repo.git.stash())
# TODO checkout to a safety branch and tag for rollback
print(rp.repo.git.pull())
if ik('update'):
for wd in wds:
if 'homebrew' in wd.as_posix():
continue
doupdate(wd)
# indescriminately run setup.py with --release set to tangle
from importlib import util as imu
oldargv = sys.argv
try:
for s in setups:
if pretend:
print(f'pushd {s.parent.as_posix()}; python setup.py --release; popd')
continue
sys.argv = ['setup.py', '--release'] # reset every time since most will mutate
print(f'Maybe tangling via {s.as_posix()}')
spec = imu.spec_from_file_location(f'setup_{s.parent.name}', s)
mod = imu.module_from_spec(spec)
try:
with s.parent: # ah relative paths
spec.loader.exec_module(mod)
except SystemExit:
pass
except Exception as e:
log.exception(e)
finally:
sys.argv = oldargv
if ik('manifest'):
from sparcur.utils import write_manifests
paths = kwargs['paths']
if not paths:
paths = [Path.cwd()]
manifests_rendered = write_manifests(parents=paths)
manifests, rendered = zip(*manifests_rendered)
nl = '\n'
print(f'generated manifests at:\n{nl.join([m.as_posix() for m in manifests])}')
if ik('open'):
cmd = kwargs['open']
[m.xopen(cmd) for m in manifests]
elif ik('show'):
[m.xopen() for m in manifests]
if __name__ == '__main__':
pipe_main(main)
from .. import common
from sparcur.simple.utils import pipe_main
def test_pipe_main():
def main(id=None, project_path=None, **kwargs):
print(id, project_path, kwargs)
pipe_main(main, argv=['sparcur-simple'])
dataset id dataset path dataset relative paths
cleaned file root metadata file objects cleaned objects write objects cleaned files at cleaned file paths
from types import MappingProxyType
from sparcur.paths import Path
from sparcur import datasets as dat
from sparcur.utils import symlink_latest, merge_template_stems
from sparcur.config import auth
from sparcur.simple.utils import rglob, log
_dmpt = MappingProxyType({})
def prepare_dataset_cleaned(dataset_path, cleaned_path=None, time_now=None):
if cleaned_path is None: # FIXME confusing and breaks w/ convention -> Options maybe?
cleaned_path = Path(auth.get_path('cleaned-path'))
from sparcur.utils import PennsieveId
identifier = PennsieveId(dataset_path.cache.id)
uuid = identifier.uuid
cleaned_dataset_folder = cleaned_path / uuid
cleaned_output_path = cleaned_dataset_folder / time_now.START_TIMESTAMP_LOCAL_FRIENDLY
if not cleaned_output_path.exists():
cleaned_output_path.mkdir(parents=True)
cleaned_latest = cleaned_dataset_folder / 'LATEST'
# FIXME do we symlink before we know things have succeeded ???
symlink_latest(cleaned_output_path, cleaned_latest)
return cleaned_output_path
def from_dataset_path_metadata_file_paths(dataset_path):
matches = []
for candidate in rglob(dataset_path, '*.xlsx'):
rp = candidate.relative_path_from(dataset_path)
if not rp.parent.name or 'anifest' in rp.name:
matches.append(candidate)
return matches
def from_path_cleaned_object(path, for_template=False, known_templates=_dmpt):
t = dat.Tabular(path)
sheet, wb, sparse_empty_rows = t._openpyxl_fixes(for_template=for_template, known_templates=known_templates)
return wb
def from_file_paths_objects(paths, for_template=False, known_templates=_dmpt):
if for_template:
template = known_templates[for_template] if for_template in known_templates else None
tmpl = merge_template_stems(template, known_templates) if template else None # FIXME this gets run again
stems = None if tmpl is None or 'stems' not in tmpl else tmpl['stems']
else:
stems = None
for path in paths:
if stems and path.stem not in stems:
yield None
continue
if path.suffix == '.xlsx':
cleaned = from_path_cleaned_object(path, for_template, known_templates)
yield cleaned
else:
yield None
def from_dataset_path_cleaned_files(dataset_path, cleaned_output_path, for_template=False, known_templates=_dmpt):
"NOTE this actually does the cleaning"
paths = from_dataset_path_metadata_file_paths(dataset_path)
for path, obj in zip(paths, from_file_paths_objects(paths, for_template, known_templates)):
if obj is not None:
drp = path.relative_path_from(dataset_path)
target = cleaned_output_path / drp
if not target.parent.exists():
target.parent.mkdir(parents=True)
obj.save(target)
def main(path=Path.cwd(), id=None, time_now=None,
parent_path=None, invariant_local_path='dataset',
parent_parent_path=Path.cwd(),
cleaned_path=None, cleaned_output_path=None,
for_template=False, **kwargs):
# note that cleaned_path is better though of as cleaned_base_path
# setup taken from path_metadata.py::main so check the notes there
if path == Path.cwd() and (id is not None or parent_path is not None):
if parent_path is None:
uuid = id.uuid
parent_path = parent_parent_path / uuid
invariant_path = parent_path / invariant_local_path
path = invariant_path.expanduser().resolve()
else:
parent_parent_path = None
path = Path(path)
cache = path.cache
if cache is None:
if cleaned_output_path is None:
raise ValueError(
'No remote cached data and cleaned_output_path not set. Pass '
'--cleaned-output-path=path/to/cleaned-for-this-dataset')
if not for_template:
log.warning('running in the wild without cached metadata')
cleaned_output_path.mkdir(parents=True, exist_ok=True)
elif not cache.is_dataset():
raise TypeError('can only run on a single dataset')
else:
cleaned_output_path = prepare_dataset_cleaned(path, cleaned_path, time_now)
if for_template:
from sparcur.config import auth
from orthauth.utils import sxpr_to_python
resources = auth.get_path('resources')
with open(resources / 'templates.sxpr') as f:
known_templates = sxpr_to_python(f.read())
# TODO source externally
_known_templates = {
'clean': False,
'default': {
'stems-closed': False,
'stems': {
'dataset_description': {
'header': 'row',
'remove': ['device_.+'],
}}}}
else:
known_templates = None
from_dataset_path_cleaned_files(path, cleaned_output_path, for_template, known_templates)
if __name__ == '__main__':
import pprint
from sparcur.simple.utils import pipe_main
pipe_main(main)
See also https://docs.racket-lang.org/graphviz/index.html raco pkg install racket-graphviz
for more direct mapping of graphviz functionality but one that is also way more verbose.
set -e
echo prepare for FAIL
echo ERROR 1>&2
false
echo this should not print
echo ERROR ERROR 1>&2
;; minimal reval
(unless (featurep 'reval)
(defvar reval-cache-directory (concat user-emacs-directory "reval/cache/"))
(defun reval-minimal (cypher checksum path-or-url &rest alternates)
"Simplified and compact implementation of reval."
(let* (done (o url-handler-mode) (csn (symbol-name checksum))
(cache-path (concat reval-cache-directory (substring csn 0 2) "/" csn
"-" (file-name-nondirectory path-or-url))))
(url-handler-mode)
(unwind-protect
(cl-loop for path-or-url in (cons cache-path (cons path-or-url alternates))
do (when (file-exists-p path-or-url)
(let* ((buffer (find-file-noselect path-or-url))
(buffer-checksum (intern (secure-hash cypher buffer))))
(if (eq buffer-checksum checksum)
(progn
(unless (string= path-or-url cache-path)
(let ((parent-path (file-name-directory cache-path))
make-backup-files)
(unless (file-directory-p parent-path)
(make-directory parent-path t))
(with-current-buffer buffer
(write-file cache-path))))
(eval-buffer buffer)
(setq done t))
(kill-buffer buffer) ; kill so cannot accidentally evaled
(error "reval: checksum mismatch! %s" path-or-url))))
until done)
(unless o
(url-handler-mode 0)))))
(defalias 'reval #'reval-minimal)
(reval 'sha256 '3620321396c967395913ff19ce507555acb92335b0545e4bd05ec0e673a0b33b
"https://raw.githubusercontent.com/tgbugs/orgstrap/300b1d5518af53d76d950097bcbcd7046cfa2285/reval.el"))
(let ((ghost "https://raw.githubusercontent.com/tgbugs/orgstrap/"))
(unless (featurep 'ow)
(reval 'sha256 '670c68e5649987fb64a93a7b5610ace0f18a0b71f376faf7499de933247931f2
(concat ghost "021b66c8f1dd4bf55714a4de889f31741f8460f6" "/ow.el"))))