Skip to content

Commit 42ebf73

Browse files
feat: add small tests (#73)
* add minimal ServiceX + coffea test * add servicex-databinder example * query examples for ServiceX
1 parent 5832fe7 commit 42ebf73

File tree

5 files changed

+744
-1
lines changed

5 files changed

+744
-1
lines changed

.gitignore

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ venv/
77
.ipynb_checkpoints
88

99
servicex.yml
10+
servicex.yaml
1011

1112
analyses/**/*.root
1213
analyses/**/*.pdf
@@ -24,4 +25,6 @@ workshops/agctools2022/statistical-inference/input
2425

2526
# CMS ttbar
2627
analyses/cms-open-data-ttbar/workspace.json
27-
analyses/cms-open-data-ttbar/dask-worker-space
28+
29+
# dask
30+
dask-worker-space/

tests/README.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
# tests
2+
3+
This is a space to collect miscellaneous scripts / notebooks etc. that do not fit anywhere else but are useful for testing.
Lines changed: 293 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,293 @@
1+
{
2+
"cells": [
3+
{
4+
"cell_type": "code",
5+
"execution_count": 1,
6+
"id": "c8dc945e-0f70-4542-8725-7fbfa7b47865",
7+
"metadata": {},
8+
"outputs": [],
9+
"source": [
10+
"import asyncio\n",
11+
"import time\n",
12+
"\n",
13+
"import awkward as ak\n",
14+
"from coffea.processor import servicex\n",
15+
"from func_adl import ObjectStream\n",
16+
"from func_adl_servicex import ServiceXSourceUpROOT\n",
17+
"import hist\n",
18+
"import matplotlib.pyplot as plt\n",
19+
"from servicex import ServiceXDataset"
20+
]
21+
},
22+
{
23+
"cell_type": "markdown",
24+
"id": "8d8fd413-6c3e-46a0-9945-75a4c1bc6725",
25+
"metadata": {},
26+
"source": [
27+
"Configuration options: enable / disable `dask` and the use of caching with `ServiceX` (to force re-running transforms)."
28+
]
29+
},
30+
{
31+
"cell_type": "code",
32+
"execution_count": 2,
33+
"id": "9900ed24-64ac-4661-aa31-0f4714c7db4e",
34+
"metadata": {},
35+
"outputs": [],
36+
"source": [
37+
"# enable Dask\n",
38+
"USE_DASK = False\n",
39+
"\n",
40+
"# ServiceX behavior: ignore cache with repeated queries\n",
41+
"SERVICEX_IGNORE_CACHE = True"
42+
]
43+
},
44+
{
45+
"cell_type": "markdown",
46+
"id": "545eacca-c610-4069-b96d-6c44d8083abf",
47+
"metadata": {},
48+
"source": [
49+
"The processor used here: select jets with $p_T > 25$ GeV and calculate $\\textrm{H}_\\textrm{T}^{\\textrm{had}}$ (scalar sum of jet $p_T$) as observable."
50+
]
51+
},
52+
{
53+
"cell_type": "code",
54+
"execution_count": 3,
55+
"id": "d4e60d23-795c-4417-86c6-1696be3b65ec",
56+
"metadata": {},
57+
"outputs": [],
58+
"source": [
59+
"class TtbarAnalysis(servicex.Analysis):\n",
60+
" def __init__(self):\n",
61+
" self.hist = hist.Hist.new.Reg(50, 0, 1000, name=\"ht\", label=\"HT\").Weight()\n",
62+
"\n",
63+
" def process(self, events):\n",
64+
" histogram = self.hist.copy()\n",
65+
"\n",
66+
" # select jets with pT > 25 GeV\n",
67+
" selected_jets = events.jet[events.jet.pt > 25]\n",
68+
"\n",
69+
" # use HT (scalar sum of jet pT) as observable\n",
70+
" ht = ak.sum(selected_jets.pt, axis=-1)\n",
71+
" histogram.fill(ht=ht, weight=1.0)\n",
72+
"\n",
73+
" return histogram\n",
74+
"\n",
75+
" def postprocess(self, accumulator):\n",
76+
" return accumulator"
77+
]
78+
},
79+
{
80+
"cell_type": "markdown",
81+
"id": "6702d39b-70b2-4252-a569-d9db01444469",
82+
"metadata": {},
83+
"source": [
84+
"Specify which data to process, using a small public file here taken from 2015 CMS Open Data."
85+
]
86+
},
87+
{
88+
"cell_type": "code",
89+
"execution_count": 4,
90+
"id": "34588fe7-67dd-4b87-9306-b05334fc86d4",
91+
"metadata": {},
92+
"outputs": [],
93+
"source": [
94+
"# input data to process\n",
95+
"fileset = {\n",
96+
" \"ttbar\": {\n",
97+
" \"files\": [\"https://xrootd-local.unl.edu:1094//store/user/AGC/datasets/RunIIFall15MiniAODv2/TT_TuneCUETP8M1_13TeV-powheg-pythia8/MINIAODSIM//PU25nsData2015v1_76X_mcRun2_asymptotic_v12_ext3-v1/00000/00DF0A73-17C2-E511-B086-E41D2D08DE30.root\"],\n",
98+
" \"metadata\": {\"process\": \"ttbar\"}\n",
99+
" }\n",
100+
"}"
101+
]
102+
},
103+
{
104+
"cell_type": "markdown",
105+
"id": "1bbe9b7b-19dd-4945-92e8-1757bb1b6d73",
106+
"metadata": {},
107+
"source": [
108+
"Set up the query: only requesting specific columns here without any filtering applied."
109+
]
110+
},
111+
{
112+
"cell_type": "code",
113+
"execution_count": 5,
114+
"id": "13b93c01-24ae-49a3-a91b-3a432c7d2f2b",
115+
"metadata": {},
116+
"outputs": [],
117+
"source": [
118+
"def get_query(source: ObjectStream) -> ObjectStream:\n",
119+
" \"\"\"Query for event / column selection: no filter, select single jet column\n",
120+
" \"\"\"\n",
121+
" return source.Select(lambda e: {\"jet_pt\": e.jet_pt})"
122+
]
123+
},
124+
{
125+
"cell_type": "markdown",
126+
"id": "2b55e5ac-885f-455c-a742-b30b364559c3",
127+
"metadata": {},
128+
"source": [
129+
"The following cell is mostly boilerplate, which can hopefully be improved in the future."
130+
]
131+
},
132+
{
133+
"cell_type": "code",
134+
"execution_count": 6,
135+
"id": "4022b82b-1aee-43d8-8958-b6947e5ed975",
136+
"metadata": {},
137+
"outputs": [],
138+
"source": [
139+
"def make_datasource(fileset:dict, name: str, query: ObjectStream, ignore_cache: bool):\n",
140+
" \"\"\"Creates a ServiceX datasource for a particular Open Data file.\"\"\"\n",
141+
" datasets = [ServiceXDataset(fileset[name][\"files\"], backend_name=\"uproot\", ignore_cache=ignore_cache)]\n",
142+
" return servicex.DataSource(\n",
143+
" query=query, metadata=fileset[name][\"metadata\"], datasets=datasets\n",
144+
" )\n",
145+
"\n",
146+
"\n",
147+
"async def produce_all_histograms(fileset, query, procesor_class, use_dask=False, ignore_cache=False):\n",
148+
" \"\"\"Runs the histogram production, processing input files with ServiceX and\n",
149+
" producing histograms with coffea.\n",
150+
" \"\"\"\n",
151+
" # create the query\n",
152+
" ds = ServiceXSourceUpROOT(\"cernopendata://dummy\", \"events\", backend_name=\"uproot\")\n",
153+
" ds.return_qastle = True\n",
154+
" data_query = query(ds)\n",
155+
"\n",
156+
" # executor: local or Dask\n",
157+
" if not use_dask:\n",
158+
" executor = servicex.LocalExecutor()\n",
159+
" else:\n",
160+
" # for coffea-casa\n",
161+
" executor = servicex.DaskExecutor(client_addr=\"tls://localhost:8786\")\n",
162+
" # locally\n",
163+
" # executor = servicex.DaskExecutor()\n",
164+
"\n",
165+
" datasources = [\n",
166+
" make_datasource(fileset, ds_name, data_query, ignore_cache=ignore_cache)\n",
167+
" for ds_name in fileset.keys()\n",
168+
" ]\n",
169+
"\n",
170+
" # create the analysis processor\n",
171+
" analysis_processor = procesor_class()\n",
172+
"\n",
173+
" async def run_updates_stream(accumulator_stream):\n",
174+
" \"\"\"Run to get the last item in the stream\"\"\"\n",
175+
" coffea_info = None\n",
176+
" try:\n",
177+
" async for coffea_info in accumulator_stream:\n",
178+
" pass\n",
179+
" except Exception as e:\n",
180+
" raise Exception(f\"Failure while processing\") from e\n",
181+
" return coffea_info\n",
182+
"\n",
183+
" output = await asyncio.gather(\n",
184+
" *[\n",
185+
" run_updates_stream(executor.execute(analysis_processor, source, title=source.metadata['process']))\n",
186+
" for source in datasources\n",
187+
" ]\n",
188+
" )\n",
189+
"\n",
190+
" return output"
191+
]
192+
},
193+
{
194+
"cell_type": "markdown",
195+
"id": "908d64f0-bbfa-4f75-9da4-9b5ee3e1745e",
196+
"metadata": {},
197+
"source": [
198+
"Execute everything: query `ServiceX`, which sends columns back to `coffea` processors asynchronously, collect the aggregated histogram built by `coffea`."
199+
]
200+
},
201+
{
202+
"cell_type": "code",
203+
"execution_count": 7,
204+
"id": "5b1d63a1-d9e5-4f23-a709-a73eeb0460ab",
205+
"metadata": {},
206+
"outputs": [
207+
{
208+
"data": {
209+
"application/vnd.jupyter.widget-view+json": {
210+
"model_id": "",
211+
"version_major": 2,
212+
"version_minor": 0
213+
},
214+
"text/plain": [
215+
"ttbar: 0%| | 0/9000000000.0 [00:00]"
216+
]
217+
},
218+
"metadata": {},
219+
"output_type": "display_data"
220+
},
221+
{
222+
"name": "stdout",
223+
"output_type": "stream",
224+
"text": [
225+
"execution took 13.32 seconds\n"
226+
]
227+
}
228+
],
229+
"source": [
230+
"t0 = time.time()\n",
231+
"\n",
232+
"# in a notebook\n",
233+
"output = await produce_all_histograms(\n",
234+
" fileset, get_query, TtbarAnalysis, use_dask=USE_DASK, ignore_cache=SERVICEX_IGNORE_CACHE\n",
235+
")\n",
236+
"\n",
237+
"# as a script:\n",
238+
"# async def produce_all_the_histograms():\n",
239+
"# return await produce_all_histograms(\n",
240+
"# fileset, get_query, TtbarAnalysis, use_dask=USE_DASK, ignore_cache=SERVICEX_IGNORE_CACHE\n",
241+
"# )\n",
242+
"# output = asyncio.run(produce_all_the_histograms())\n",
243+
"\n",
244+
"print(f\"execution took {time.time()-t0:.2f} seconds\")"
245+
]
246+
},
247+
{
248+
"cell_type": "code",
249+
"execution_count": 8,
250+
"id": "f9e969b9-9925-4d43-9d0f-201d1547681f",
251+
"metadata": {},
252+
"outputs": [
253+
{
254+
"data": {
255+
"image/png": "",
256+
"text/plain": [
257+
"<Figure size 432x288 with 1 Axes>"
258+
]
259+
},
260+
"metadata": {
261+
"needs_background": "light"
262+
},
263+
"output_type": "display_data"
264+
}
265+
],
266+
"source": [
267+
"output[0].plot(label=\"ttbar\")\n",
268+
"plt.legend();"
269+
]
270+
}
271+
],
272+
"metadata": {
273+
"kernelspec": {
274+
"display_name": "Python 3 (ipykernel)",
275+
"language": "python",
276+
"name": "python3"
277+
},
278+
"language_info": {
279+
"codemirror_mode": {
280+
"name": "ipython",
281+
"version": 3
282+
},
283+
"file_extension": ".py",
284+
"mimetype": "text/x-python",
285+
"name": "python",
286+
"nbconvert_exporter": "python",
287+
"pygments_lexer": "ipython3",
288+
"version": "3.8.13"
289+
}
290+
},
291+
"nbformat": 4,
292+
"nbformat_minor": 5
293+
}

0 commit comments

Comments
 (0)