FairShip
Loading...
Searching...
No Matches
genie_interface.py
Go to the documentation of this file.
1# SPDX-License-Identifier: LGPL-3.0-or-later
2# SPDX-FileCopyrightText: Copyright CERN for the benefit of the SHiP Collaboration
3
4"""Utilities to run GENIE tools with per-call environment overrides.
5
6This module wraps common GENIE command-line tools (``gmkspl``, ``gevgen``,
7``gntpc``) and a ROOT helper to copy flux histograms into simulation files.
8It provides:
9
10- Safe subprocess execution (no ``shell=True``)
11- Per-call environment overrides (set/unset variables like ``GXMLPATH``)
12- Helpful printing/logging of the exact command line
13
14Notes:
15-----
16* A Python process cannot modify its parent shell environment. Use the
17 ``env_vars=...`` argument to set/unset variables **for the child process**
18 only. If you need to persist environment variables, put them in your shell
19 rc (e.g. ``~/.bashrc``) or wrap execution in ``source/env`` scripts.
20
21* The path to ``Messenger_laconic.xml`` is auto-detected from ``$GENIE`` if
22 defined and readable; otherwise the ``--message-thresholds`` option is
23 omitted.
24
25Example:
26-------
27>>> from genie_interface import generate_genie_events
28>>> generate_genie_events(
29... nevents=1000, nupdg=14, emin=1, emax=100, targetcode="1000080160",
30... inputflux="/path/flux.root", spline="/path/xsec.xml", outputfile="out",
31... env_vars={"GXMLPATH": "/opt/genie/config", "GENIE": "/cvmfs/.../genie"}
32... )
33
34"""
35
36from __future__ import annotations
37
38import logging
39import os
40import shlex
41import subprocess
42from collections.abc import Mapping, Sequence
43
44import ROOT # type: ignore
45
46__all__ = [
47 "_run",
48 "add_hists",
49 "generate_genie_events",
50 "get_1D_flux_name",
51 "get_2D_flux_name",
52 "main",
53 "make_ntuples",
54 "make_splines",
55]
56
57__version__ = "0.1.0"
58
59PathLike = str | os.PathLike
60
61logger = logging.getLogger(__name__)
62
63
64def _merge_env(env_vars: Mapping[str, str | None] | None) -> dict[str, str]:
65 """Merge ``env_vars`` with the current environment.
66
67 Parameters
68 ----------
69 env_vars:
70 Mapping of KEY -> VALUE to set for the child process. Use ``None`` to
71 **unset** a variable (remove it) for the child environment.
72
73 Returns
74 -------
75 dict
76 The environment dict to pass into ``subprocess.run``.
77
78 """
79 env: dict[str, str] = dict(os.environ)
80 if env_vars:
81 for k, v in env_vars.items():
82 if v is None:
83 env.pop(k, None)
84 else:
85 env[k] = str(v)
86 return env
87
88
89def _run(
90 args: Sequence[str],
91 env_vars: Mapping[str, str | None] | None = None,
92 *,
93 check: bool = True,
94) -> subprocess.CompletedProcess:
95 """Run a command with optional environment overrides.
96
97 Parameters
98 ----------
99 args:
100 Command and arguments, each as a separate list element. (No shell.)
101 env_vars:
102 Optional mapping of KEY -> VALUE to inject into the child environment.
103 Use ``None`` value to **unset** a variable.
104 check:
105 If True (default), raise ``CalledProcessError`` on non-zero exit.
106
107 Returns
108 -------
109 subprocess.CompletedProcess
110 The completed process object.
111
112 Raises
113 ------
114 subprocess.CalledProcessError
115 If the command exits with non-zero status and ``check=True``.
116
117 """
118 env = _merge_env(env_vars)
119 cmd_str = shlex.join(args)
120 logger.info("Executing: %s", cmd_str)
121 print(">>", cmd_str)
122 return subprocess.run(args, env=env, check=check)
123
124
125def get_1D_flux_name(nupdg: int) -> str:
126 """Return the TH1D flux histogram name for a given neutrino PDG.
127
128 Naming convention (as used by input flux ROOT files):
129 - Neutrino (PDG > 0): ``'10' + |pdg|``
130 - Anti-neutrino (PDG < 0): ``'20' + |pdg|``
131
132 Examples
133 --------
134 >>> get_1D_flux_name(12) # nu_e
135 '1012'
136 >>> get_1D_flux_name(-12) # anti-nu_e
137 '2012'
138
139 """
140 x = ROOT.TMath.Abs(nupdg)
141 return ("10" if nupdg > 0 else "20") + str(x)
142
143
144def get_2D_flux_name(nupdg: int) -> str:
145 """Return the TH2D p–pT flux histogram name for a given neutrino PDG.
146
147 Naming convention (as used by input flux ROOT files):
148 - Neutrino (PDG > 0): ``'12' + |pdg|``
149 - Anti-neutrino (PDG < 0): ``'22' + |pdg|``
150
151 Examples
152 --------
153 >>> get_2D_flux_name(12) # nu_e
154 '1212'
155 >>> get_2D_flux_name(-12) # anti-nu_e
156 '2212'
157
158 """
159 x = ROOT.TMath.Abs(nupdg)
160 return ("12" if nupdg > 0 else "22") + str(x)
161
162
163def make_splines(
164 nupdglist: Sequence[int],
165 targetcode: str,
166 emax: int | float,
167 nknots: int,
168 outputfile: PathLike,
169 env_vars: Mapping[str, str | None] | None = None,
170) -> subprocess.CompletedProcess:
171 """Generate GENIE cross-section splines via ``gmkspl``.
172
173 Parameters
174 ----------
175 nupdglist:
176 Iterable of neutrino PDG codes (e.g. ``[12, 14, 16, -12, -14, -16]``).
177 targetcode:
178 GENIE nuclear target code (e.g. ``'1000080160'`` for O-16).
179 emax:
180 Maximum neutrino energy (GeV).
181 nknots:
182 Number of knots for the spline generation.
183 outputfile:
184 Path to output XML spline file.
185 env_vars:
186 Per-call environment overrides (e.g. set ``GXMLPATH`` or ``GENIE``).
187
188 Returns
189 -------
190 subprocess.CompletedProcess
191
192 """
193 inputnupdg = ",".join(str(p) for p in nupdglist)
194 args = [
195 "gmkspl",
196 "-p",
197 inputnupdg,
198 "-t",
199 targetcode,
200 "-n",
201 str(nknots),
202 "-e",
203 str(emax),
204 "-o",
205 str(outputfile),
206 ]
207 return _run(args, env_vars=env_vars)
208
209
210def generate_genie_events(
211 nevents: int,
212 nupdg: int,
213 emin: int | float,
214 emax: int | float,
215 targetcode: str,
216 inputflux: PathLike,
217 spline: PathLike,
218 outputfile: PathLike,
219 *,
220 process: str | None = None,
221 seed: int | None = None,
222 irun: int | None = None,
223 env_vars: Mapping[str, str | None] | None = None,
224) -> subprocess.CompletedProcess:
225 """Run GENIE ``gevgen`` to generate events.
226
227 Parameters
228 ----------
229 nevents:
230 Number of events to generate.
231 nupdg:
232 Neutrino PDG code (e.g. 12, 14, 16 or negatives for anti-neutrinos).
233 emin, emax:
234 Energy range in GeV (inclusive, inclusive as accepted by GENIE).
235 targetcode:
236 GENIE nuclear target code.
237 inputflux:
238 Path to ROOT file with flux histograms.
239 spline:
240 Path to GENIE cross-section spline XML file.
241 outputfile:
242 Output prefix/file for GENIE (as accepted by ``-o``).
243 process:
244 Optional GENIE generator list (e.g. ``'CCDIS'``, ``'NC'``).
245 seed:
246 Optional RNG seed for reproducibility.
247 irun:
248 Optional integer run number (``--run``).
249 env_vars:
250 Per-call environment overrides (e.g. set/unset ``GXMLPATH``, ``GENIE``).
251
252 Returns
253 -------
254 subprocess.CompletedProcess
255
256 Notes
257 -----
258 If ``GENIE`` is defined in the child environment, this function will try
259 to pass ``--message-thresholds $GENIE/config/Messenger_laconic.xml`` if
260 the file exists; otherwise that option is omitted.
261
262 """
263 env = _merge_env(env_vars)
264
265 msg_xml = None
266 genie_root = env.get("GENIE")
267 if genie_root:
268 candidate = os.path.join(genie_root, "config", "Messenger_laconic.xml")
269 if os.path.isfile(candidate):
270 msg_xml = candidate
271
272 args = [
273 "gevgen",
274 "-n",
275 str(nevents),
276 "-p",
277 str(nupdg),
278 "-t",
279 targetcode,
280 "-e",
281 f"{emin},{emax}",
282 ]
283 if msg_xml:
284 args += ["--message-thresholds", msg_xml]
285
286 args += [
287 "-f",
288 f"{inputflux},{get_1D_flux_name(nupdg)}",
289 "--cross-sections",
290 str(spline),
291 "-o",
292 str(outputfile),
293 ]
294
295 if process is not None:
296 args += ["--event-generator-list", process]
297 if seed is not None:
298 args += ["--seed", str(seed)]
299 if irun is not None:
300 args += ["--run", str(irun)]
301
302 return _run(args, env_vars=env_vars)
303
304
305def make_ntuples(
306 inputfile: PathLike,
307 outputfile: PathLike,
308 env_vars: Mapping[str, str | None] | None = None,
309) -> subprocess.CompletedProcess:
310 """Convert a GENIE ``.ghep.root`` file to GST via ``gntpc``.
311
312 Parameters
313 ----------
314 inputfile:
315 Input GHEP file (e.g. ``gntp.0.ghep.root``).
316 outputfile:
317 Output GST file path.
318 env_vars:
319 Per-call environment overrides.
320
321 Returns
322 -------
323 subprocess.CompletedProcess
324
325 """
326 args = ["gntpc", "-i", str(inputfile), "-f", "gst", "-o", str(outputfile)]
327 return _run(args, env_vars=env_vars)
328
329
330def add_hists(inputflux: PathLike, simfile: PathLike, nupdg: int) -> None:
331 """Copy the 2D p–pT flux histogram from ``inputflux`` into ``simfile``.
332
333 Parameters
334 ----------
335 inputflux:
336 Path to ROOT file containing TH2D flux histograms.
337 simfile:
338 Path to the simulation ROOT file to **update** (opened in 'UPDATE' mode).
339 nupdg:
340 Neutrino PDG code used to select the histogram (see ``get_2D_flux_name``).
341
342 Raises
343 ------
344 FileNotFoundError
345 If the requested histogram is not found in ``inputflux``.
346 RuntimeError
347 If ROOT fails to open either file.
348
349 """
350 infile = ROOT.TFile(str(inputflux), "READ")
351 if not infile or infile.IsZombie():
352 raise RuntimeError(f"Failed to open input flux file: {inputflux}")
353 sfile = ROOT.TFile(str(simfile), "UPDATE")
354 if not sfile or sfile.IsZombie():
355 infile.Close()
356 raise RuntimeError(f"Failed to open simulation file for update: {simfile}")
357
358 try:
359 hname = get_2D_flux_name(nupdg)
360 obj = infile.Get(hname)
361 if not obj:
362 raise FileNotFoundError(f"Histogram '{hname}' not found in {inputflux}")
363 # Write into the current directory of sfile
364 sfile.cd()
365 obj.Write()
366 finally:
367 infile.Close()
368 sfile.Close()
369
370
371# --------------------------- CLI ---------------------------------------------
372
373
374def _parse_env_kv(pairs: Sequence[str]) -> dict[str, str | None]:
375 """Parse ``KEY=VAL`` pairs for ``--env`` CLI flag."""
376 out: dict[str, str | None] = {}
377 for p in pairs:
378 if "=" not in p:
379 raise ValueError(f"Expected KEY=VALUE, got '{p}'")
380 k, v = p.split("=", 1)
381 out[k] = v
382 return out
383
384
385def main(argv: Sequence[str] | None = None) -> int:
386 """Entry point for the command-line interface."""
387 import argparse
388
389 parser = argparse.ArgumentParser(prog="genie-utils", description="GENIE helpers with per-call env overrides")
390 parser.add_argument(
391 "--env",
392 metavar="KEY=VALUE",
393 action="append",
394 default=[],
395 help="Set environment variable(s) for the child process (repeatable).",
396 )
397 parser.add_argument(
398 "--unset",
399 metavar="KEY",
400 action="append",
401 default=[],
402 help="Unset environment variable(s) for the child process (repeatable).",
403 )
404 parser.add_argument("-v", "--verbose", action="count", default=0, help="Increase verbosity.")
405 sub = parser.add_subparsers(dest="cmd", required=True)
406
407 p_spl = sub.add_parser("splines", help="Run gmkspl")
408 p_spl.add_argument("-p", "--pdg", nargs="+", type=int, required=True)
409 p_spl.add_argument("-t", "--target", required=True)
410 p_spl.add_argument("-e", "--emax", type=float, required=True)
411 p_spl.add_argument("-n", "--nknots", type=int, required=True)
412 p_spl.add_argument("-o", "--output", required=True)
413
414 p_gen = sub.add_parser("generate", help="Run gevgen")
415 p_gen.add_argument("-n", "--nevents", type=int, required=True)
416 p_gen.add_argument("-p", "--pdg", type=int, required=True)
417 p_gen.add_argument("--emin", type=float, required=True)
418 p_gen.add_argument("--emax", type=float, required=True)
419 p_gen.add_argument("-t", "--target", required=True)
420 p_gen.add_argument("-f", "--flux", required=True)
421 p_gen.add_argument("-s", "--spline", required=True)
422 p_gen.add_argument("-o", "--output", required=True)
423 p_gen.add_argument("--process", default=None)
424 p_gen.add_argument("--seed", type=int, default=None)
425 p_gen.add_argument("--run", type=int, dest="irun", default=None)
426
427 p_nt = sub.add_parser("ntuples", help="Run gntpc (ghep -> gst)")
428 p_nt.add_argument("-i", "--input", required=True)
429 p_nt.add_argument("-o", "--output", required=True)
430
431 p_add = sub.add_parser("add-hists", help="Copy 2D flux hist to sim file")
432 p_add.add_argument("-f", "--flux", required=True)
433 p_add.add_argument("-s", "--sim", required=True)
434 p_add.add_argument("-p", "--pdg", type=int, required=True)
435
436 args = parser.parse_args(argv)
437
438 # logging
439 level = logging.WARNING
440 if args.verbose == 1:
441 level = logging.INFO
442 elif args.verbose >= 2:
443 level = logging.DEBUG
444 logging.basicConfig(level=level, format="%(levelname)s %(name)s: %(message)s")
445
446 # env merging for CLI
447 env_vars: dict[str, str | None] = {}
448 if args.env:
449 env_vars.update(_parse_env_kv(args.env))
450 for key in args.unset:
451 env_vars[key] = None
452
453 if args.cmd == "splines":
454 make_splines(
455 nupdglist=args.pdg,
456 targetcode=args.target,
457 emax=args.emax,
458 nknots=args.nknots,
459 outputfile=args.output,
460 env_vars=env_vars or None,
461 )
462 elif args.cmd == "generate":
463 generate_genie_events(
464 nevents=args.nevents,
465 nupdg=args.pdg,
466 emin=args.emin,
467 emax=args.emax,
468 targetcode=args.target,
469 inputflux=args.flux,
470 spline=args.spline,
471 outputfile=args.output,
472 process=args.process,
473 seed=args.seed,
474 irun=args.irun,
475 env_vars=env_vars or None,
476 )
477 elif args.cmd == "ntuples":
478 make_ntuples(
479 inputfile=args.input,
480 outputfile=args.output,
481 env_vars=env_vars or None,
482 )
483 elif args.cmd == "add-hists":
484 add_hists(inputflux=args.flux, simfile=args.sim, nupdg=args.pdg)
485 else:
486 parser.error("Unknown command")
487
488 return 0
489
490
491if __name__ == "__main__":
492 main()
int main(Sequence[str]|None argv=None)
dict[str, str|None] _parse_env_kv(Sequence[str] pairs)
subprocess.CompletedProcess _run(Sequence[str] args, Mapping[str, str|None]|None env_vars=None, *bool check=True)
dict[str, str] _merge_env(Mapping[str, str|None]|None env_vars)
str get_2D_flux_name(int nupdg)