FairShip
Loading...
Searching...
No Matches
run_reco.py
Go to the documentation of this file.
1# SPDX-License-Identifier: LGPL-3.0-or-later
2# SPDX-FileCopyrightText: Copyright CERN for the benefit of the SHiP Collaboration
3
4import contextlib
5import getpass
6import multiprocessing
7import os
8import subprocess
9import time
10
11import ROOT
12import rootUtils as ut
13
14ncores = min(multiprocessing.cpu_count(), 4)
15user = getpass.getuser()
16# support for eos, assume: eosmount $HOME/eos
17
18h = {}
19
20
21def fitSingleGauss(x: str, ba: float | None = None, be: float | None = None) -> None:
22 name = "myGauss_" + x
23 myGauss = h[x].GetListOfFunctions().FindObject(name)
24 if not myGauss:
25 if not ba:
26 ba = h[x].GetBinCenter(1)
27 if not be:
28 be = h[x].GetBinCenter(h[x].GetNbinsX())
29 bw = h[x].GetBinWidth(1)
30 mean = h[x].GetMean()
31 sigma = h[x].GetRMS()
32 norm = h[x].GetEntries() * 0.3
33 myGauss = ROOT.TF1(name, "[0]*" + str(bw) + "/([2]*sqrt(2*pi))*exp(-0.5*((x-[1])/[2])**2)+[3]", 4)
34 myGauss.SetParameter(0, norm)
35 myGauss.SetParameter(1, mean)
36 myGauss.SetParameter(2, sigma)
37 myGauss.SetParameter(3, 1.0)
38 myGauss.SetParName(0, "Signal")
39 myGauss.SetParName(1, "Mean")
40 myGauss.SetParName(2, "Sigma")
41 myGauss.SetParName(3, "bckgr")
42 h[x].Fit(myGauss, "", "", ba, be)
43
44
45cmd = os.environ["FAIRSHIP"] + "/macro/ShipReco.py"
46cmdAna = os.environ["FAIRSHIP"] + "/macro/ShipAna.py"
47
48
49def execute_parallel(prefix, ncpu: int = 4):
50 cpus = {}
51 log = {}
52 for i in range(ncpu):
53 cpus[i] = None
54 jobs = []
55 for x in os.listdir("."):
56 if not x.find(prefix) < 0:
57 if os.path.isdir(x):
58 jobs.append(x)
59 k = 0
60 print(jobs)
61 for x in jobs:
62 if k == ncpu:
63 k = 0
64 if cpus[k] is not None:
65 cpus[k].communicate()
66 log[k].close()
67 print("change to directory ", k, x)
68 os.chdir("./" + x)
69 for f in os.listdir("."):
70 if not f.find("geofile_full") < 0:
71 inputfile = f.replace("geofile_full", "ship")
72 log[k] = open("logRec", "w") # noqa: SIM115
73 cpus[k] = subprocess.Popen(
74 ["python", cmd, "-n 9999999 -f " + inputfile],
75 stdout=log[k],
76 )
77 k += 1
78 os.chdir("../")
79 break
80 return cpus, log
81
82
83def getJobs(prefix) -> list[str]:
84 jobs = []
85 for x in os.listdir("."):
86 if not x.find(prefix) < 0:
87 if os.path.isdir(x):
88 jobs.append(x)
89 return jobs
90
91
93 processoutput = os.popen("ps -u " + user).read()
94 nproc = 0
95 for x in processoutput.split("\n"):
96 if not x.find("python") < 0 and x.find("defunct") < 0:
97 nproc += 1
98 return nproc
99
100
101def killAll() -> None:
102 processoutput = os.popen("ps -u " + user).read()
103 for x in processoutput.split("\n"):
104 if not x.find("python") < 0:
105 pid = int(x[:5])
106 print("kill " + str(pid))
107 os.system("kill " + str(pid))
108
109
110def executeSimple(prefixes: list[str], reset=False) -> None:
111 proc = {}
112 for prefix in prefixes:
113 jobs = getJobs(prefix)
114 for x in jobs:
115 print("change to directory ", x)
116 os.chdir("./" + x)
117 geofile = None
118 for f in os.listdir("."):
119 if not f.find("geofile_full") < 0:
120 geofile = f
121 break
122 if not geofile:
123 print("ERROR: no geofile found", x)
124 os.chdir("../")
125 continue
126 else:
127 inputfile = geofile.replace("geofile_full", "ship")
128 nproc = 100
129 while nproc > ncores:
130 nproc = checkRunningProcesses()
131 if nproc > ncores:
132 print("wait a minute")
133 time.sleep(100)
134 print("launch reco", x)
135 proc[x] = 1
136 with contextlib.suppress(Exception):
137 os.system("rm logRec")
138 if reset:
139 os.system("python " + cmd + " -n 9999999 -f " + inputfile + " --saveDisk >> logRec &")
140 else:
141 os.system("python " + cmd + " -n 9999999 -f " + inputfile + " >> logRec &")
142 os.chdir("../")
143 time.sleep(10)
144 nJobs = len(proc)
145 while nJobs > 0:
146 nJobs = len(proc)
147 print("debug ", nJobs)
148 for p in sorted(proc.keys()):
149 os.chdir("./" + p)
150 nproc = 100
151 while nproc > ncores:
152 nproc = checkRunningProcesses()
153 if nproc > ncores:
154 print("wait a minute")
155 time.sleep(100)
156 with open("logRec") as log:
157 completed = False
158 rl = log.readlines()
159 if "finishing" in rl[len(rl) - 1]:
160 completed = True
161 if completed:
162 print("analyze ", p, nproc)
163 with contextlib.suppress(Exception):
164 os.system("rm logAna")
165 os.system(
166 "python " + cmdAna + " -n 9999999 -f " + inputfile.replace(".root", "_rec.root") + " >> logAna &"
167 )
168 proc.pop(p)
169 time.sleep(10)
170 else:
171 print("Rec job not finished yet", p)
172 nproc = checkRunningProcesses()
173 if nproc == 1:
174 print("rec job probably failed, only when python process running")
175 time.sleep(100)
176 os.chdir("../")
177
178
179def executeAna(prefixes) -> None:
180 log = {}
181 for prefix in prefixes:
182 jobs = getJobs(prefix)
183 for x in jobs:
184 print("change to directory ", x)
185 os.chdir("./" + x)
186 for f in os.listdir("."):
187 if not f.find("geofile_full") < 0:
188 inputfile = f.replace("geofile_full", "ship")
189 log[x] = open("logAna", "w") # noqa: SIM115
190 process = subprocess.Popen(
191 ["python", cmdAna, "-n 9999999", "-f " + inputfile.replace(".root", "_rec.root")], stdout=log[x]
192 )
193 process.wait()
194 print("finished ", process.returncode)
195 log[x].close()
196 os.chdir("../")
197 break
198
199
200h = {}
201
202
203def mergeHistosMakePlots(p: list[str]) -> None:
204 if not isinstance(p, list):
205 pl = [p]
206 else:
207 pl = p
208 hlist = ""
209 for p in pl:
210 prefix = str(p)
211 for x in os.listdir("."):
212 if not x.find(prefix) < 0:
213 if os.path.isdir(x):
214 hlist += x + "/ShipAna.root "
215 print("-->", hlist)
216 os.system("hadd -f ShipAna.root " + hlist)
217 ut.readHists(h, "ShipAna.root")
218 print(h["meanhits"].GetEntries())
219 if 1 > 0:
220 ut.bookCanvas(h, key="strawanalysis", title="Distance to wire and mean nr of hits", nx=1200, ny=600, cx=2, cy=1)
221 #
222 cv = h["strawanalysis"].cd(1)
223 h["disty"].DrawCopy()
224 h["distu"].DrawCopy("same")
225 h["distv"].DrawCopy("same")
226 cv = h["strawanalysis"].cd(2)
227 h["meanhits"].DrawCopy()
228 print(h["meanhits"].GetEntries())
229
230 ut.bookCanvas(h, key="fitresults", title="Fit Results", nx=1600, ny=1200, cx=2, cy=2)
231 cv = h["fitresults"].cd(1)
232 h["delPOverP"].Draw("box")
233 cv = h["fitresults"].cd(2)
234 cv.SetLogy(1)
235 h["chi2"].Draw()
236 cv = h["fitresults"].cd(3)
237 h["delPOverP_proj"] = h["delPOverP"].ProjectionY()
238 ROOT.gStyle.SetOptFit(11111)
239 h["delPOverP_proj"].Draw()
240 h["delPOverP_proj"].Fit("gaus")
241 cv = h["fitresults"].cd(4)
242 h["delPOverP2_proj"] = h["delPOverP2"].ProjectionY()
243 h["delPOverP2_proj"].Draw()
244 fitSingleGauss("delPOverP2_proj")
245 h["fitresults"].Print("fitresults.gif")
246 ut.bookCanvas(h, key="fitresults2", title="Fit Results", nx=1600, ny=1200, cx=2, cy=2)
247 print("finished with first canvas")
248 cv = h["fitresults2"].cd(1)
249 h["Doca"].Draw()
250 cv = h["fitresults2"].cd(2)
251 h["IP0"].Draw()
252 cv = h["fitresults2"].cd(3)
253 h["HNL"].Draw()
254 fitSingleGauss("HNL", 0.0, 2.0)
255 cv = h["fitresults2"].cd(4)
256 h["IP0/mass"].Draw("box")
257 h["fitresults2"].Print("fitresults2.gif")
258 h["strawanalysis"].Print("strawanalysis.gif")
259 print("finished making plots")
260
261
262def mergeNtuples(prefixes: list[str]) -> None:
263 for prefix in prefixes:
264 jobs = getJobs(prefix)
265 haddCommand = ""
266 for x in jobs:
267 for f in os.listdir(x):
268 if not f.find("geofile_full") < 0:
269 inputfile = (f.replace("geofile_full", "ship")).replace(".root", "_rec.root")
270 haddCommand += " " + x + "/" + inputfile
271 break
272 cmd = "hadd -f " + inputfile.replace(".root", "_" + prefix + ".root") + haddCommand
273 os.system(cmd)
274
275
276def checkProd(prefixes, quiet=False):
277 summary = {"Sim": {}, "Rec": {}, "Ana": {}}
278 for prefix in prefixes:
279 jobs = getJobs(prefix)
280 for x in jobs:
281 try:
282 log = open(x + "/log") # noqa: SIM115
283 except OSError:
284 if not quiet:
285 print("no log file for ", x)
286 summary["Sim"][x] = -1
287 continue
288 rl = log.readlines()
289 log.close()
290 if len(rl) < 1:
291 if not quiet:
292 print("simulation failed log file 0", x)
293 summary["Sim"][x] = 0
294 continue
295 elif "Real time" in rl[len(rl) - 1]:
296 if not quiet:
297 print("simulation step OK ", x)
298 summary["Sim"][x] = 1
299 else:
300 if not quiet:
301 print("simulation failed ", x)
302 summary["Sim"][x] = 0
303 continue
304 try:
305 log = open(x + "/logRec") # noqa: SIM115
306 except OSError:
307 if not quiet:
308 print("no logRec file for ", x)
309 summary["Rec"][x] = -1
310 continue
311 rl = log.readlines()
312 log.close()
313 if "finishing" in rl[len(rl) - 1]:
314 if not quiet:
315 print("reconstruction step OK ", x)
316 summary["Rec"][x] = 1
317 else:
318 if not quiet:
319 print("reconstruction failed ", x)
320 summary["Rec"][x] = 0
321 continue
322 try:
323 log = open(x + "/logAna") # noqa: SIM115
324 except OSError:
325 if not quiet:
326 print("no logAna file for ", x)
327 summary["Ana"][x] = -1
328 continue
329 rl = log.readlines()
330 log.close()
331 if "finished" in rl[len(rl) - 1]:
332 if not quiet:
333 print("analysis step OK ", x)
334 summary["Ana"][x] = 1
335 else:
336 if not quiet:
337 print("analysis failed ", x)
338 summary["Ana"][x] = 0
339 continue
340 return summary
341
342
343def printFailedJobs(pl) -> None:
344 result = checkProd(pl, quiet=True)
345 for p in result:
346 for x in result[p]:
347 if result[p][x] < 1:
348 print(p, x, result[p][x])
349
350
351def execute() -> None:
352 executeSimple(pl, reset=True)
354 mergeNtuples(pl)
355
356
357def removeIntermediateFiles(prefixes) -> None:
358 for prefix in prefixes:
359 jobs = getJobs(prefix)
360 for x in jobs:
361 for f in os.listdir(x):
362 if not f.find("geofile_full") < 0:
363 inputfile = (f.replace("geofile_full", "ship")).replace(".root", "_rec.root")
364 os.system("rm " + x + "/" + inputfile)
365
366
367def copyRecoToEos(pl) -> None:
368 result = checkProd(pl, quiet=True)
369 eos = "/afs/cern.ch/project/eos/installation/0.3.15/bin/eos.select"
370 for x in result["Rec"]:
371 if result["Rec"][x] < 1:
372 print("Reco failed !", x, result["Rec"][x])
373 else:
374 cmd = (
375 eos
376 + " cp -r "
377 + os.path.abspath(".")
378 + "/"
379 + x
380 + "/ /eos/experiment/ship/data/DAFreco/muonBackground/"
381 + x
382 + "/"
383 )
384 os.system(cmd)
385 print("copied to eos", x)
386
387
388pl = []
389for p in os.sys.argv[1].split(","):
390 pref = "muon"
391 xx = os.path.abspath(".").lower()
392 if not xx.find("neutrino") < 0:
393 pref = "neutrino"
394 if not xx.find("vdis") < 0 or not xx.find("vetodis") < 0:
395 pref = "disV"
396 elif not xx.find("clby") < 0:
397 pref = "disCLBY"
398 elif not xx.find("dis") < 0:
399 pref = "dis"
400 pl.append(pref + p)
401print(" execute() input comma separated production nr, performs Simple/mergeHistos/mergeNtuples ")
402print(" executeSimple(pl,reset=True) ")
403print(" checkProd(pl)")
404print(" executeAna(pl) ")
405print(" mergeNtuples(pl) ")
406print(" removeIntermediateFiles(pl) only _rec ")
407print(" checkRunningProcesses() ")
408# 61,611,612,613,614,615,616,62,621,622,623,624,625,626
None killAll()
Definition: run_reco.py:101
def checkProd(prefixes, quiet=False)
Definition: run_reco.py:276
def execute_parallel(prefix, int ncpu=4)
Definition: run_reco.py:49
None execute()
Definition: run_reco.py:351
None printFailedJobs(pl)
Definition: run_reco.py:343
None copyRecoToEos(pl)
Definition: run_reco.py:367
None removeIntermediateFiles(prefixes)
Definition: run_reco.py:357
None mergeNtuples(list[str] prefixes)
Definition: run_reco.py:262
list[str] getJobs(prefix)
Definition: run_reco.py:83
None mergeHistosMakePlots(list[str] p)
Definition: run_reco.py:203
None executeAna(prefixes)
Definition: run_reco.py:179
int checkRunningProcesses()
Definition: run_reco.py:92
None executeSimple(list[str] prefixes, reset=False)
Definition: run_reco.py:110
None fitSingleGauss(str x, float|None ba=None, float|None be=None)
Definition: run_reco.py:21
int open(const char *, int)
Opens a file descriptor.
int read(int, char *, size_t)
Read bytes from a file descriptor.
int close(int)
Closes the file descriptor fd.