FairShip
Loading...
Searching...
No Matches
analysis_toolkit.py
Go to the documentation of this file.
1#!/usr/bin/env python
2# SPDX-License-Identifier: LGPL-3.0-or-later
3# SPDX-FileCopyrightText: Copyright CERN for the benefit of the SHiP Collaboration
4
5"""Toolkit for Analysis."""
6
7import numpy as np
8import pythia8_conf
9import ROOT
10import shipunit as u
11import yaml
12from ShipGeoConfig import AttrDict, load_from_root_file
13from tabulate import tabulate
14
15
17 """Class to perform various selection checks on the candidate."""
18
19 def __init__(self, geo_file) -> None:
20 """Initialize the selection_check class with geometry and configuration."""
21 self.geometry_manager = geo_file.Get("FAIRGeom")
22 self.ship_geo = load_from_root_file(geo_file, "ShipGeo")
23
24 fairship = ROOT.gSystem.Getenv("FAIRSHIP")
25
26 if self.ship_geo.DecayVolumeMedium == "helium":
27 with open(fairship + "/geometry/veto_config_helium.yaml") as file:
28 config = yaml.safe_load(file)
29 self.veto_geo = AttrDict(config)
30 _ = self.veto_geo.z0
31 if self.ship_geo.DecayVolumeMedium == "vacuums":
32 with open(fairship + "/geometry/veto_config_vacuums.yaml") as file:
33 config = yaml.safe_load(file)
34 self.veto_geo = AttrDict(config)
35
36 def access_event(self, tree) -> None:
37 """Access event data."""
38 self.tree = tree
39
40 def define_candidate_time(self, candidate):
41 """Calculate time associated with the candidate decay vertex using strawtubes MCPoint info."""
42 t0 = self.tree.ShipEventHeader.GetEventTime()
43 candidate_pos = ROOT.TVector3()
44 candidate.GetVertex(candidate_pos)
45
46 d1, d2 = candidate.GetDaughter(0), candidate.GetDaughter(1)
47 d1_mc, d2_mc = self.tree.fitTrack2MC[d1], self.tree.fitTrack2MC[d2]
48
49 time_vtx_from_strawhits = []
50
51 for hit in self.tree.strawtubesPoint:
52 if not (int(str(hit.GetDetectorID())[:1]) == 1 or int(str(hit.GetDetectorID())[:1]) == 2):
53 continue
54
55 if not (hit.GetTrackID() == d1_mc or hit.GetTrackID() == d2_mc):
56 continue
57
58 t_straw = hit.GetTime()
59
60 dist = np.sqrt(
61 (candidate_pos.X() - hit.GetX()) ** 2
62 + (candidate_pos.Y() - hit.GetY()) ** 2
63 + (candidate_pos.Z() - hit.GetZ()) ** 2
64 ) # distance to the vertex in cm
65
66 d_mom = self.tree.MCTrack[hit.GetTrackID()].GetP() / u.GeV
67 mass = self.tree.MCTrack[hit.GetTrackID()].GetMass()
68 v = u.c_light * d_mom / np.sqrt(d_mom**2 + (mass) ** 2)
69
70 t_vertex = t_straw - (dist / v)
71
72 time_vtx_from_strawhits.append(t_vertex)
73
74 t_vtx = np.average(time_vtx_from_strawhits) + t0
75
76 return t_vtx # units in ns
77
78 def impact_parameter(self, candidate):
79 """Calculate the impact parameter of the candidate relative to (0,0,target.z0)."""
80 candidate_pos = ROOT.TVector3()
81 candidate.GetVertex(candidate_pos)
82
83 candidate_mom = ROOT.TLorentzVector()
84 candidate.Momentum(candidate_mom)
85 target_point = ROOT.TVector3(0, 0, self.ship_geo.target.z0)
86
87 projection_factor = 0
88 if hasattr(candidate_mom, "P"):
89 P = candidate_mom.P()
90 else:
91 P = candidate_mom.Mag()
92 for i in range(3):
93 projection_factor += candidate_mom(i) / P * (target_point(i) - candidate_pos(i))
94
95 dist = 0
96 for i in range(3):
97 dist += (target_point(i) - candidate_pos(i) - projection_factor * candidate_mom(i) / P) ** 2
98 dist = ROOT.TMath.Sqrt(dist)
99
100 return dist # in cm
101
102 def dist_to_innerwall(self, candidate) -> float | int:
103 """Calculate the minimum distance(in XY plane) of the candidate decay vertex to the inner wall of the decay vessel. If outside the decay volume, or if distance > 100cm,Return 0."""
104 candidate_pos = ROOT.TVector3()
105 candidate.GetVertex(candidate_pos)
106 position = (candidate_pos.X(), candidate_pos.Y(), candidate_pos.Z())
107
108 nsteps = 8
109 dalpha = 2 * ROOT.TMath.Pi() / nsteps
110 min_distance = float("inf")
111
112 node = self.geometry_manager.FindNode(*position)
113 if not node:
114 return 0 # is outside the decay volume
115
116 # Loop over directions in the XY plane
117 for n in range(nsteps):
118 alpha = n * dalpha
119 direction = (
120 ROOT.TMath.Sin(alpha),
121 ROOT.TMath.Cos(alpha),
122 0.0,
123 ) # Direction vector in XY plane
124 self.geometry_manager.InitTrack(*position, *direction)
125 if not self.geometry_manager.FindNextBoundary():
126 continue
127 # Get the distance to the boundary and update the minimum distance
128 distance = self.geometry_manager.GetStep()
129 min_distance = min(min_distance, distance)
130
131 return min_distance if min_distance < 100 * u.m else 0
132
133 def dist_to_vesselentrance(self, candidate):
134 """Calculate the distance of the candidate decay vertex to the entrance of the decay vessel."""
135 candidate_pos = ROOT.TVector3()
136 candidate.GetVertex(candidate_pos)
137 return candidate_pos.Z() - self.veto_geo.z0
138
139 def nDOF(self, candidate):
140 """Return the number of degrees of freedom (nDOF) for the particle's daughter tracks."""
141 nmeas = []
142 t1, t2 = candidate.GetDaughter(0), candidate.GetDaughter(1)
143
144 for tr in [t1, t2]:
145 fit_status = self.tree.FitTracks[tr].getFitStatus()
146 nmeas.append(round(fit_status.getNdf())) # nmeas.append(fit_status.getNdf())
147
148 return np.array(nmeas)
149
150 def daughtermomentum(self, candidate):
151 """Return the momentum(Mag) of the particle's daughter tracks."""
152 daughter_mom = []
153 t1, t2 = candidate.GetDaughter(0), candidate.GetDaughter(1)
154 for trD in [t1, t2]:
155 x = self.tree.FitTracks[trD]
156 xx = x.getFittedState()
157 daughter_mom.append(xx.getMom().Mag())
158
159 return np.array(daughter_mom)
160
161 def invariant_mass(self, candidate):
162 """Invariant mass of the candidate."""
163 return candidate.GetMass()
164
165 def DOCA(self, candidate):
166 """Distance of Closest Approach."""
167 return candidate.GetDoca()
168
169 def is_in_fiducial(self, candidate) -> bool:
170 """Check if the candidate decay vertex is within the Fiducial Volume."""
171 candidate_pos = ROOT.TVector3()
172 candidate.GetVertex(candidate_pos)
173
174 if candidate_pos.Z() > self.ship_geo.TrackStation1.z:
175 return False
176 if candidate_pos.Z() < self.veto_geo.z0:
177 return False
178
179 # if self.dist2InnerWall(candidate)<=5*u.cm: return False
180
181 vertex_node = ROOT.gGeoManager.FindNode(candidate_pos.X(), candidate_pos.Y(), candidate_pos.Z())
182 vertex_elem = vertex_node.GetVolume().GetName()
183 return vertex_elem.startswith("DecayVacuum_")
184
185 def chi2nDOF(self, candidate):
186 """Return the reduced chi^2 of the particle's daughter tracks."""
187 t1, t2 = candidate.GetDaughter(0), candidate.GetDaughter(1)
188
189 chi2ndf = []
190 for tr in [t1, t2]:
191 fit_status = self.tree.FitTracks[tr].getFitStatus()
192 chi2ndf.append(fit_status.getChi2() / fit_status.getNdf())
193
194 return np.array(chi2ndf)
195
196 def preselection_cut(self, candidate, IP_cut: float = 250, show_table: bool = False) -> bool:
197 """
198 Umbrella method to apply the pre-selection cuts on the candidate.
199
200 show_table=True tabulates the pre-selection parameters.
201 """
202 flag = True
203
204 if len(self.tree.Particles) != 1:
205 flag = False
206 if not (self.is_in_fiducial(candidate)):
207 flag = False
208 if self.dist_to_innerwall(candidate) <= 5 * u.cm:
209 flag = False
210 if self.dist_to_vesselentrance(candidate) <= 100 * u.cm:
211 flag = False
212 if self.impact_parameter(candidate) >= IP_cut * u.cm:
213 flag = False
214 if self.DOCA(candidate) >= 1 * u.cm:
215 flag = False
216 if np.any(self.nDOF(candidate) <= 25):
217 flag = False
218 if np.any(self.chi2nDOF(candidate) >= 5):
219 flag = False
220 if np.any(self.daughtermomentum(candidate) <= 1 * u.GeV):
221 flag = False
222
223 if show_table:
224 table = [
225 [
226 "Number of candidates in event",
227 len(self.tree.Particles),
228 "==1",
229 len(self.tree.Particles) == 1,
230 ],
231 [
232 "Time @ decay vertex (ns)",
233 self.define_candidate_time(candidate),
234 "",
235 "",
236 ],
237 [
238 "Impact Parameter (cm)",
239 self.impact_parameter(candidate),
240 f"IP < {IP_cut * u.cm} cm",
241 self.impact_parameter(candidate) < IP_cut * u.cm,
242 ],
243 [
244 "DOCA (cm)",
245 self.DOCA(candidate),
246 "DOCA < 1 cm",
247 self.DOCA(candidate) < 1 * u.cm,
248 ],
249 [
250 "Is within Fiducial Volume?",
251 self.is_in_fiducial(candidate),
252 "True",
253 self.is_in_fiducial(candidate),
254 ],
255 [
256 "Dist2InnerWall (cm)",
257 self.dist_to_innerwall(candidate),
258 "> 5 cm",
259 self.dist_to_innerwall(candidate) > 5 * u.cm,
260 ],
261 [
262 "Dist2VesselEntrance (cm)",
263 self.dist_to_vesselentrance(candidate),
264 "> 100 cm",
265 self.dist_to_vesselentrance(candidate) > 100 * u.cm,
266 ],
267 ["Invariant Mass (GeV)", self.invariant_mass(candidate), "", ""],
268 [
269 "Daughter Momentum [d1, d2] (GeV)",
270 self.daughtermomentum(candidate),
271 "> 1 GeV",
272 np.all(self.daughtermomentum(candidate) > 1 * u.GeV),
273 ],
274 [
275 "Degrees of Freedom [d1, d2]",
276 self.nDOF(candidate),
277 "> 25",
278 np.all(self.nDOF(candidate) > 25),
279 ],
280 [
281 "Reduced Chi^2 [d1, d2]",
282 self.chi2nDOF(candidate),
283 "< 5",
284 np.all(self.chi2nDOF(candidate) < 5),
285 ],
286 ["\033[1mPre-selection passed:\033[0m", "", "", flag],
287 ]
288
289 for row in table:
290 row[3] = (
291 f"\033[1;32m{row[3]}\033[0m" if row[3] else f"\033[1;31m{row[3]}\033[0m"
292 ) # Green for True, Red for False
293
294 print(
295 tabulate(
296 table,
297 headers=[
298 "Parameter",
299 "Value",
300 "Pre-selection cut",
301 "Pre-selection Check",
302 ],
303 tablefmt="grid",
304 )
305 )
306 return flag
307
308
310 """Class to inspect MCtruth of an Event."""
311
312 def __init__(self) -> None:
313 """Initialize ROOT PDG database."""
314 self.pdg = ROOT.TDatabasePDG.Instance()
315 pythia8_conf.addHNLtoROOT()
316
317 def dump_event(self, event, mom_threshold: float = 0) -> None:
318 """Dump the MCtruth of the event."""
319 headers = [
320 "#",
321 "particle",
322 "pdgcode",
323 "mother_id",
324 "Momentum [Px,Py,Pz] (GeV/c)",
325 "StartVertex[x,y,z] (m)",
326 "Process",
327 "GetWeight()",
328 ]
329
330 event_table = []
331 for trackNr, track in enumerate(event.MCTrack):
332 if track.GetP() / u.GeV < mom_threshold:
333 continue
334
335 particle = self.pdg.GetParticle(track.GetPdgCode())
336 particlename = particle.GetName() if particle else "----"
337
338 event_table.append(
339 [
340 trackNr,
341 particlename,
342 track.GetPdgCode(),
343 track.GetMotherId(),
344 f"[{track.GetPx() / u.GeV:7.3f},{track.GetPy() / u.GeV:7.3f},{track.GetPz() / u.GeV:7.3f}]",
345 f"[{track.GetStartX() / u.m:7.3f},{track.GetStartY() / u.m:7.3f},{track.GetStartZ() / u.m:7.3f}]",
346 track.GetProcName().Data(),
347 track.GetWeight(),
348 ]
349 )
350
351 print(tabulate(event_table, headers=headers, floatfmt=".3f", tablefmt="simple_outline"))
None dump_event(self, event, float mom_threshold=0)
bool preselection_cut(self, candidate, float IP_cut=250, bool show_table=False)
int open(const char *, int)
Opens a file descriptor.