Module ldump3
Expand source code
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
__all__ = ['Snap', 'ldump']
# ldump3.py
"""
ldump3.py
Module converted from Python 2.x to Python 3.x.
ldump tool
Read and process dump files with line segment information for visualization purposes.
## Usage
l = ldump("dump.one") # Read one or more dump files
l = ldump("dump.1 dump.2.gz") # Can handle gzipped files
l = ldump("dump.*") # Wildcard expands to multiple files
l = ldump("dump.*", 0) # Two arguments: store filenames but don't read
time = l.next() # Read next snapshot from dump files
l.map(1, "id", 3, "x") # Assign names to atom columns
time, box, atoms, bonds, tris, lines = l.viz(index) # Return list of viz objects
- `viz()` returns line info for the specified timestep index
- Can also call as `viz(time, 1)` to find index of preceding snapshot
- `atoms`, `bonds`, `tris`, `lines` contain respective information
l.owrap(...) # Wrap lines to the same image as their atoms
- `owrap()` is called by dump tool's `owrap()`
- Useful for wrapping all molecule's atoms/lines the same so they are contiguous
## Notes
- Incomplete and duplicate snapshots are deleted.
- No column name assignment is performed unless explicitly mapped.
- `viz()` is tailored for visualization tools that require line segment information.
## History
- 11/10, Steve Plimpton (SNL): original version
- 2025-01-17, INRAE\Olivier Vitrac conversion
## Dependencies
- Python 3.x
- `numpy` library (fallback to `Numeric` if `numpy` is not available)
"""
# History
# 11/10, Steve Plimpton (SNL): original version
# 2025-01-17, first conversion in connection with the update of pizza.dump3
# Imports and external programs
import sys
import glob
from os import popen
from math import sqrt
from copy import deepcopy
import numpy as np
# Define external dependency directly
PIZZA_GUNZIP = "gunzip"
# --------------------------------------------------------------------
# Helper Classes and Functions
class Snap:
"""
Represents a single snapshot from a dump file.
Attributes:
time (int): Time stamp of the snapshot.
natoms (int): Number of atoms (line segments) in the snapshot.
xlo, xhi, ylo, yhi, zlo, zhi (float): Box bounds.
atoms (numpy.ndarray or list): Array of atom (line segment) data.
"""
def __init__(self):
self.time = 0
self.natoms = 0
self.xlo = self.xhi = self.ylo = self.yhi = self.zlo = self.zhi = 0.0
self.atoms = None
# --------------------------------------------------------------------
# ldump Class Definition
class ldump:
"""
ldump class for reading and processing ChemCell dump files with line segment information.
"""
# --------------------------------------------------------------------
def __init__(self, *args):
"""
Initialize the ldump object.
Parameters:
*args: Variable length argument list.
- If one argument, it's the list of files to read.
- If two arguments, the second argument indicates incremental reading.
"""
self.snaps = []
self.nsnaps = 0
self.names = {}
# flist = list of all dump file names
if len(args) == 0:
raise Exception("No dump files specified for ldump.")
words = args[0].split()
self.flist = []
for word in words:
self.flist += glob.glob(word)
if len(self.flist) == 0 and len(args) == 1:
raise Exception("No ldump file specified.")
if len(args) == 1:
self.increment = 0
self.read_all()
else:
self.increment = 1
self.nextfile = 0
self.eof = 0
# --------------------------------------------------------------------
def read_all(self):
"""
Read all snapshots from the list of dump files.
"""
for file in self.flist:
# Test for gzipped file
if file.endswith(".gz"):
try:
f = popen(f"{PIZZA_GUNZIP} -c {file}", 'r')
except Exception as e:
print(f"Error opening gzipped file {file}: {e}")
continue
else:
try:
f = open(file, 'r')
except Exception as e:
print(f"Error opening file {file}: {e}")
continue
# Read all snapshots in the current file
while True:
snap = self.read_snapshot(f)
if not snap:
break
self.snaps.append(snap)
print(snap.time, end=' ')
sys.stdout.flush()
f.close()
print()
# Sort entries by timestep and remove duplicates
self.snaps.sort(key=lambda x: x.time)
self.cull()
self.nsnaps = len(self.snaps)
print(f"read {self.nsnaps} snapshots")
# --------------------------------------------------------------------
# Read the next snapshot from the list of files (incremental reading)
def next(self):
"""
Read the next snapshot in incremental mode.
Returns:
int: Time stamp of the snapshot read, or -1 if no snapshots left.
"""
if not self.increment:
raise Exception("Cannot read incrementally with current ldump configuration.")
# Read next snapshot in current file using eof as pointer
while self.nextfile < len(self.flist):
file = self.flist[self.nextfile]
# Open the current file
if file.endswith(".gz"):
try:
f = popen(f"{PIZZA_GUNZIP} -c {file}", 'r')
except Exception as e:
print(f"Error opening gzipped file {file}: {e}")
self.nextfile += 1
continue
else:
try:
f = open(file, 'r')
except Exception as e:
print(f"Error opening file {file}: {e}")
self.nextfile += 1
continue
# Seek to the last read position
f.seek(self.eof)
snap = self.read_snapshot(f)
if not snap:
# End of file reached
f.close()
self.nextfile += 1
self.eof = 0
continue
self.eof = f.tell()
f.close()
# Check for duplicate time stamp
if any(existing_snap.time == snap.time for existing_snap in self.snaps):
continue # Skip duplicate
self.snaps.append(snap)
self.nsnaps += 1
return snap.time
return -1 # No snapshots left
# --------------------------------------------------------------------
# Read a single snapshot from a file
def read_snapshot(self, f):
"""
Read a single snapshot from the file.
Parameters:
f (file object): Opened file object to read from.
Returns:
Snap: Snapshot object, or None if failed.
"""
try:
snap = Snap()
# Read and discard the first line (usually a comment or header)
item = f.readline()
if not item:
return None
# Read time stamp
line = f.readline()
if not line:
return None
snap.time = int(line.strip().split()[0]) # Read first field as time
# Read number of atoms (line segments)
line = f.readline()
if not line:
return None
snap.natoms = int(line.strip())
# Read box bounds
line = f.readline()
if not line:
return None
words = f.readline().split()
if len(words) < 2:
raise Exception("Incomplete box bounds data.")
snap.xlo, snap.xhi = float(words[0]), float(words[1])
words = f.readline().split()
if len(words) < 2:
raise Exception("Incomplete box bounds data.")
snap.ylo, snap.yhi = float(words[0]), float(words[1])
words = f.readline().split()
if len(words) < 2:
raise Exception("Incomplete box bounds data.")
snap.zlo, snap.zhi = float(words[0]), float(words[1])
# Read past another line (possibly a header)
item = f.readline()
if not item:
return None
if snap.natoms:
# Read atom (line segment) data
words = f.readline().split()
ncol = len(words)
atoms = []
atoms.append([float(value) for value in words])
for _ in range(1, snap.natoms):
words = f.readline().split()
if not words:
break
atoms.append([float(value) for value in words])
snap.atoms = np.array(atoms, dtype=float)
else:
snap.atoms = None
return snap
except Exception as e:
print(f"Error reading snapshot: {e}")
return None
# --------------------------------------------------------------------
# Map atom column names
def map(self, *pairs):
"""
Assign names to atom columns.
Parameters:
*pairs: Pairs of (column_number, "column_name")
"""
if len(pairs) % 2 != 0:
raise Exception("ldump map() requires pairs of mappings.")
for i in range(0, len(pairs), 2):
col_num = pairs[i]
col_name = pairs[i + 1]
self.names[col_name] = col_num - 1 # Zero-based indexing
# --------------------------------------------------------------------
# Return list of snapshot time stamps
def time(self):
"""
Get a list of all snapshot time stamps.
Returns:
list: List of time stamps.
"""
return [snap.time for snap in self.snaps]
# --------------------------------------------------------------------
# Delete successive snapshots with duplicate time stamps
def cull(self):
"""
Remove duplicate snapshots based on time stamps.
"""
unique_snaps = []
seen_times = set()
for snap in self.snaps:
if snap.time not in seen_times:
unique_snaps.append(snap)
seen_times.add(snap.time)
self.snaps = unique_snaps
# --------------------------------------------------------------------
# Find the index of a given timestep
def findtime(self, n):
"""
Find the index of a given timestep.
Parameters:
n (int): The timestep to find.
Returns:
int: The index of the timestep.
Raises:
Exception: If the timestep does not exist.
"""
for i in range(self.nsnaps):
if self.snaps[i].time == n:
return i
raise Exception(f"No step {n} exists.")
# --------------------------------------------------------------------
# Return list of lines to viz for snapshot isnap
# If called with flag, then index is timestep, so convert to snapshot index
def viz(self, index, flag=0):
"""
Return line segment information for visualization.
Parameters:
index (int): Snapshot index or time stamp.
flag (int): If 1, interpret index as time stamp.
Returns:
tuple: (time, box, atoms, bonds, tris, lines)
atoms, bonds, tris are None.
lines contain [id, type, x1, y1, z1, x2, y2, z2] for each line.
"""
if not flag:
isnap = index
else:
times = self.time()
n = len(times)
i = 0
while i < n:
if times[i] > index:
break
i += 1
isnap = i - 1
if isnap < 0 or isnap >= self.nsnaps:
return (-1, None, None, None, None, None)
snap = self.snaps[isnap]
time = snap.time
box = [snap.xlo, snap.ylo, snap.zlo, snap.xhi, snap.yhi, snap.zhi]
# Retrieve column indices
try:
id_col = self.names["id"]
type_col = self.names["type"]
end1x_col = self.names["end1x"]
end1y_col = self.names["end1y"]
end2x_col = self.names["end2x"]
end2y_col = self.names["end2y"]
except KeyError as e:
print(f"Column name {e} not mapped. Use l.map() to assign column names.")
return (time, box, None, None, None, None)
# Create line list
lines = []
for i in range(snap.natoms):
atom = snap.atoms[i]
try:
bond_id = int(atom[id_col])
bond_type = int(atom[type_col])
end1x = float(atom[end1x_col])
end1y = float(atom[end1y_col])
end2x = float(atom[end2x_col])
end2y = float(atom[end2y_col])
except (IndexError, ValueError) as e:
print(f"Error processing atom data: {e}")
continue
# Assuming z1 and z2 are zero since original code sets them to 0.0
if end1x == 0.0 and end1y == 0.0 and end2x == 0.0 and end2y == 0.0:
continue # Skip lines with all zeros
lines.append([bond_id, bond_type, end1x, end1y, 0.0, end2x, end2y, 0.0])
return (time, box, None, None, None, lines)
# --------------------------------------------------------------------
# Wrap line end points associated with atoms through periodic boundaries
# Invoked by dump() when it does an owrap() on its atoms
def owrap(self, time, xprd, yprd, zprd, idsdump, atomsdump, iother, ix, iy, iz):
"""
Wrap line end points associated with atoms through periodic boundaries.
Parameters:
time (int): Time stamp of the snapshot.
xprd, yprd, zprd (float): Periodicity in x, y, z directions.
idsdump (dict): Dictionary mapping atom IDs to dump indices.
atomsdump (list): List of atoms from the dump.
iother (int): Other index for wrapping.
ix, iy, iz (int): Column indices for x, y, z coordinates.
"""
try:
id_col = self.names["id"]
end1x_col = self.names["end1x"]
end1y_col = self.names["end1y"]
end2x_col = self.names["end2x"]
end2y_col = self.names["end2y"]
except KeyError as e:
print(f"Column name {e} not mapped. Use l.map() to assign column names.")
return
try:
isnap = self.findtime(time)
except Exception as e:
print(e)
return
snap = self.snaps[isnap]
atoms = snap.atoms
# idump = index of my line I in dump's atoms
# jdump = atom J in dump's atoms that atom I was owrapped on
# delx, dely = offset applied to atom I and thus to line I
for i in range(snap.natoms):
try:
tag = int(atoms[i][id_col])
idump = idsdump[tag]
jdump = idsdump[int(atomsdump[idump][iother])]
delx = (atomsdump[idump][ix] - atomsdump[jdump][ix]) * xprd
dely = (atomsdump[idump][iy] - atomsdump[jdump][iy]) * yprd
delz = (atomsdump[idump][iz] - atomsdump[jdump][iz]) * zprd
atoms[i][end1x_col] += delx
atoms[i][end1y_col] += dely
atoms[i][end2x_col] += delx
atoms[i][end2y_col] += dely
# Assuming z-coordinates need to be handled if present
# If not, the z-component remains unchanged
except (KeyError, IndexError, ValueError) as e:
print(f"Error in owrap: {e}")
continue
# --------------------------------------------------------------------
# Example Usage (for testing purposes)
# Uncomment the following lines to test the ldump3.py module
if __name__ == "__main__":
l = ldump("dump.one")
l.map(1, "id", 3, "x", 4, "y", 5, "z", 6, "end1x", 7, "end1y", 8, "end2x", 9, "end2y")
while True:
time = l.next()
if time == -1:
break
print(f"Snapshot Time: {time}")
_, box, _, _, _, lines = l.viz(time, 1)
print(f"Lines: {lines}")
Classes
class Snap
-
Represents a single snapshot from a dump file.
Attributes
time
:int
- Time stamp of the snapshot.
natoms
:int
- Number of atoms (line segments) in the snapshot.
- xlo, xhi, ylo, yhi, zlo, zhi (float): Box bounds.
atoms
:numpy.ndarray
orlist
- Array of atom (line segment) data.
Expand source code
class Snap: """ Represents a single snapshot from a dump file. Attributes: time (int): Time stamp of the snapshot. natoms (int): Number of atoms (line segments) in the snapshot. xlo, xhi, ylo, yhi, zlo, zhi (float): Box bounds. atoms (numpy.ndarray or list): Array of atom (line segment) data. """ def __init__(self): self.time = 0 self.natoms = 0 self.xlo = self.xhi = self.ylo = self.yhi = self.zlo = self.zhi = 0.0 self.atoms = None
class ldump (*args)
-
ldump class for reading and processing ChemCell dump files with line segment information.
Initialize the ldump object.
Parameters
*args: Variable length argument list. - If one argument, it's the list of files to read. - If two arguments, the second argument indicates incremental reading.
Expand source code
class ldump: """ ldump class for reading and processing ChemCell dump files with line segment information. """ # -------------------------------------------------------------------- def __init__(self, *args): """ Initialize the ldump object. Parameters: *args: Variable length argument list. - If one argument, it's the list of files to read. - If two arguments, the second argument indicates incremental reading. """ self.snaps = [] self.nsnaps = 0 self.names = {} # flist = list of all dump file names if len(args) == 0: raise Exception("No dump files specified for ldump.") words = args[0].split() self.flist = [] for word in words: self.flist += glob.glob(word) if len(self.flist) == 0 and len(args) == 1: raise Exception("No ldump file specified.") if len(args) == 1: self.increment = 0 self.read_all() else: self.increment = 1 self.nextfile = 0 self.eof = 0 # -------------------------------------------------------------------- def read_all(self): """ Read all snapshots from the list of dump files. """ for file in self.flist: # Test for gzipped file if file.endswith(".gz"): try: f = popen(f"{PIZZA_GUNZIP} -c {file}", 'r') except Exception as e: print(f"Error opening gzipped file {file}: {e}") continue else: try: f = open(file, 'r') except Exception as e: print(f"Error opening file {file}: {e}") continue # Read all snapshots in the current file while True: snap = self.read_snapshot(f) if not snap: break self.snaps.append(snap) print(snap.time, end=' ') sys.stdout.flush() f.close() print() # Sort entries by timestep and remove duplicates self.snaps.sort(key=lambda x: x.time) self.cull() self.nsnaps = len(self.snaps) print(f"read {self.nsnaps} snapshots") # -------------------------------------------------------------------- # Read the next snapshot from the list of files (incremental reading) def next(self): """ Read the next snapshot in incremental mode. Returns: int: Time stamp of the snapshot read, or -1 if no snapshots left. """ if not self.increment: raise Exception("Cannot read incrementally with current ldump configuration.") # Read next snapshot in current file using eof as pointer while self.nextfile < len(self.flist): file = self.flist[self.nextfile] # Open the current file if file.endswith(".gz"): try: f = popen(f"{PIZZA_GUNZIP} -c {file}", 'r') except Exception as e: print(f"Error opening gzipped file {file}: {e}") self.nextfile += 1 continue else: try: f = open(file, 'r') except Exception as e: print(f"Error opening file {file}: {e}") self.nextfile += 1 continue # Seek to the last read position f.seek(self.eof) snap = self.read_snapshot(f) if not snap: # End of file reached f.close() self.nextfile += 1 self.eof = 0 continue self.eof = f.tell() f.close() # Check for duplicate time stamp if any(existing_snap.time == snap.time for existing_snap in self.snaps): continue # Skip duplicate self.snaps.append(snap) self.nsnaps += 1 return snap.time return -1 # No snapshots left # -------------------------------------------------------------------- # Read a single snapshot from a file def read_snapshot(self, f): """ Read a single snapshot from the file. Parameters: f (file object): Opened file object to read from. Returns: Snap: Snapshot object, or None if failed. """ try: snap = Snap() # Read and discard the first line (usually a comment or header) item = f.readline() if not item: return None # Read time stamp line = f.readline() if not line: return None snap.time = int(line.strip().split()[0]) # Read first field as time # Read number of atoms (line segments) line = f.readline() if not line: return None snap.natoms = int(line.strip()) # Read box bounds line = f.readline() if not line: return None words = f.readline().split() if len(words) < 2: raise Exception("Incomplete box bounds data.") snap.xlo, snap.xhi = float(words[0]), float(words[1]) words = f.readline().split() if len(words) < 2: raise Exception("Incomplete box bounds data.") snap.ylo, snap.yhi = float(words[0]), float(words[1]) words = f.readline().split() if len(words) < 2: raise Exception("Incomplete box bounds data.") snap.zlo, snap.zhi = float(words[0]), float(words[1]) # Read past another line (possibly a header) item = f.readline() if not item: return None if snap.natoms: # Read atom (line segment) data words = f.readline().split() ncol = len(words) atoms = [] atoms.append([float(value) for value in words]) for _ in range(1, snap.natoms): words = f.readline().split() if not words: break atoms.append([float(value) for value in words]) snap.atoms = np.array(atoms, dtype=float) else: snap.atoms = None return snap except Exception as e: print(f"Error reading snapshot: {e}") return None # -------------------------------------------------------------------- # Map atom column names def map(self, *pairs): """ Assign names to atom columns. Parameters: *pairs: Pairs of (column_number, "column_name") """ if len(pairs) % 2 != 0: raise Exception("ldump map() requires pairs of mappings.") for i in range(0, len(pairs), 2): col_num = pairs[i] col_name = pairs[i + 1] self.names[col_name] = col_num - 1 # Zero-based indexing # -------------------------------------------------------------------- # Return list of snapshot time stamps def time(self): """ Get a list of all snapshot time stamps. Returns: list: List of time stamps. """ return [snap.time for snap in self.snaps] # -------------------------------------------------------------------- # Delete successive snapshots with duplicate time stamps def cull(self): """ Remove duplicate snapshots based on time stamps. """ unique_snaps = [] seen_times = set() for snap in self.snaps: if snap.time not in seen_times: unique_snaps.append(snap) seen_times.add(snap.time) self.snaps = unique_snaps # -------------------------------------------------------------------- # Find the index of a given timestep def findtime(self, n): """ Find the index of a given timestep. Parameters: n (int): The timestep to find. Returns: int: The index of the timestep. Raises: Exception: If the timestep does not exist. """ for i in range(self.nsnaps): if self.snaps[i].time == n: return i raise Exception(f"No step {n} exists.") # -------------------------------------------------------------------- # Return list of lines to viz for snapshot isnap # If called with flag, then index is timestep, so convert to snapshot index def viz(self, index, flag=0): """ Return line segment information for visualization. Parameters: index (int): Snapshot index or time stamp. flag (int): If 1, interpret index as time stamp. Returns: tuple: (time, box, atoms, bonds, tris, lines) atoms, bonds, tris are None. lines contain [id, type, x1, y1, z1, x2, y2, z2] for each line. """ if not flag: isnap = index else: times = self.time() n = len(times) i = 0 while i < n: if times[i] > index: break i += 1 isnap = i - 1 if isnap < 0 or isnap >= self.nsnaps: return (-1, None, None, None, None, None) snap = self.snaps[isnap] time = snap.time box = [snap.xlo, snap.ylo, snap.zlo, snap.xhi, snap.yhi, snap.zhi] # Retrieve column indices try: id_col = self.names["id"] type_col = self.names["type"] end1x_col = self.names["end1x"] end1y_col = self.names["end1y"] end2x_col = self.names["end2x"] end2y_col = self.names["end2y"] except KeyError as e: print(f"Column name {e} not mapped. Use l.map() to assign column names.") return (time, box, None, None, None, None) # Create line list lines = [] for i in range(snap.natoms): atom = snap.atoms[i] try: bond_id = int(atom[id_col]) bond_type = int(atom[type_col]) end1x = float(atom[end1x_col]) end1y = float(atom[end1y_col]) end2x = float(atom[end2x_col]) end2y = float(atom[end2y_col]) except (IndexError, ValueError) as e: print(f"Error processing atom data: {e}") continue # Assuming z1 and z2 are zero since original code sets them to 0.0 if end1x == 0.0 and end1y == 0.0 and end2x == 0.0 and end2y == 0.0: continue # Skip lines with all zeros lines.append([bond_id, bond_type, end1x, end1y, 0.0, end2x, end2y, 0.0]) return (time, box, None, None, None, lines) # -------------------------------------------------------------------- # Wrap line end points associated with atoms through periodic boundaries # Invoked by dump() when it does an owrap() on its atoms def owrap(self, time, xprd, yprd, zprd, idsdump, atomsdump, iother, ix, iy, iz): """ Wrap line end points associated with atoms through periodic boundaries. Parameters: time (int): Time stamp of the snapshot. xprd, yprd, zprd (float): Periodicity in x, y, z directions. idsdump (dict): Dictionary mapping atom IDs to dump indices. atomsdump (list): List of atoms from the dump. iother (int): Other index for wrapping. ix, iy, iz (int): Column indices for x, y, z coordinates. """ try: id_col = self.names["id"] end1x_col = self.names["end1x"] end1y_col = self.names["end1y"] end2x_col = self.names["end2x"] end2y_col = self.names["end2y"] except KeyError as e: print(f"Column name {e} not mapped. Use l.map() to assign column names.") return try: isnap = self.findtime(time) except Exception as e: print(e) return snap = self.snaps[isnap] atoms = snap.atoms # idump = index of my line I in dump's atoms # jdump = atom J in dump's atoms that atom I was owrapped on # delx, dely = offset applied to atom I and thus to line I for i in range(snap.natoms): try: tag = int(atoms[i][id_col]) idump = idsdump[tag] jdump = idsdump[int(atomsdump[idump][iother])] delx = (atomsdump[idump][ix] - atomsdump[jdump][ix]) * xprd dely = (atomsdump[idump][iy] - atomsdump[jdump][iy]) * yprd delz = (atomsdump[idump][iz] - atomsdump[jdump][iz]) * zprd atoms[i][end1x_col] += delx atoms[i][end1y_col] += dely atoms[i][end2x_col] += delx atoms[i][end2y_col] += dely # Assuming z-coordinates need to be handled if present # If not, the z-component remains unchanged except (KeyError, IndexError, ValueError) as e: print(f"Error in owrap: {e}") continue
Methods
def cull(self)
-
Remove duplicate snapshots based on time stamps.
Expand source code
def cull(self): """ Remove duplicate snapshots based on time stamps. """ unique_snaps = [] seen_times = set() for snap in self.snaps: if snap.time not in seen_times: unique_snaps.append(snap) seen_times.add(snap.time) self.snaps = unique_snaps
def findtime(self, n)
-
Find the index of a given timestep.
Parameters
n (int): The timestep to find.
Returns
int
- The index of the timestep.
Raises
Exception
- If the timestep does not exist.
Expand source code
def findtime(self, n): """ Find the index of a given timestep. Parameters: n (int): The timestep to find. Returns: int: The index of the timestep. Raises: Exception: If the timestep does not exist. """ for i in range(self.nsnaps): if self.snaps[i].time == n: return i raise Exception(f"No step {n} exists.")
def map(self, *pairs)
-
Assign names to atom columns.
Parameters
*pairs: Pairs of (column_number, "column_name")
Expand source code
def map(self, *pairs): """ Assign names to atom columns. Parameters: *pairs: Pairs of (column_number, "column_name") """ if len(pairs) % 2 != 0: raise Exception("ldump map() requires pairs of mappings.") for i in range(0, len(pairs), 2): col_num = pairs[i] col_name = pairs[i + 1] self.names[col_name] = col_num - 1 # Zero-based indexing
def next(self)
-
Read the next snapshot in incremental mode.
Returns
int
- Time stamp of the snapshot read, or -1 if no snapshots left.
Expand source code
def next(self): """ Read the next snapshot in incremental mode. Returns: int: Time stamp of the snapshot read, or -1 if no snapshots left. """ if not self.increment: raise Exception("Cannot read incrementally with current ldump configuration.") # Read next snapshot in current file using eof as pointer while self.nextfile < len(self.flist): file = self.flist[self.nextfile] # Open the current file if file.endswith(".gz"): try: f = popen(f"{PIZZA_GUNZIP} -c {file}", 'r') except Exception as e: print(f"Error opening gzipped file {file}: {e}") self.nextfile += 1 continue else: try: f = open(file, 'r') except Exception as e: print(f"Error opening file {file}: {e}") self.nextfile += 1 continue # Seek to the last read position f.seek(self.eof) snap = self.read_snapshot(f) if not snap: # End of file reached f.close() self.nextfile += 1 self.eof = 0 continue self.eof = f.tell() f.close() # Check for duplicate time stamp if any(existing_snap.time == snap.time for existing_snap in self.snaps): continue # Skip duplicate self.snaps.append(snap) self.nsnaps += 1 return snap.time return -1 # No snapshots left
def owrap(self, time, xprd, yprd, zprd, idsdump, atomsdump, iother, ix, iy, iz)
-
Wrap line end points associated with atoms through periodic boundaries.
Parameters
time (int): Time stamp of the snapshot. xprd, yprd, zprd (float): Periodicity in x, y, z directions. idsdump (dict): Dictionary mapping atom IDs to dump indices. atomsdump (list): List of atoms from the dump. iother (int): Other index for wrapping. ix, iy, iz (int): Column indices for x, y, z coordinates.
Expand source code
def owrap(self, time, xprd, yprd, zprd, idsdump, atomsdump, iother, ix, iy, iz): """ Wrap line end points associated with atoms through periodic boundaries. Parameters: time (int): Time stamp of the snapshot. xprd, yprd, zprd (float): Periodicity in x, y, z directions. idsdump (dict): Dictionary mapping atom IDs to dump indices. atomsdump (list): List of atoms from the dump. iother (int): Other index for wrapping. ix, iy, iz (int): Column indices for x, y, z coordinates. """ try: id_col = self.names["id"] end1x_col = self.names["end1x"] end1y_col = self.names["end1y"] end2x_col = self.names["end2x"] end2y_col = self.names["end2y"] except KeyError as e: print(f"Column name {e} not mapped. Use l.map() to assign column names.") return try: isnap = self.findtime(time) except Exception as e: print(e) return snap = self.snaps[isnap] atoms = snap.atoms # idump = index of my line I in dump's atoms # jdump = atom J in dump's atoms that atom I was owrapped on # delx, dely = offset applied to atom I and thus to line I for i in range(snap.natoms): try: tag = int(atoms[i][id_col]) idump = idsdump[tag] jdump = idsdump[int(atomsdump[idump][iother])] delx = (atomsdump[idump][ix] - atomsdump[jdump][ix]) * xprd dely = (atomsdump[idump][iy] - atomsdump[jdump][iy]) * yprd delz = (atomsdump[idump][iz] - atomsdump[jdump][iz]) * zprd atoms[i][end1x_col] += delx atoms[i][end1y_col] += dely atoms[i][end2x_col] += delx atoms[i][end2y_col] += dely # Assuming z-coordinates need to be handled if present # If not, the z-component remains unchanged except (KeyError, IndexError, ValueError) as e: print(f"Error in owrap: {e}") continue
def read_all(self)
-
Read all snapshots from the list of dump files.
Expand source code
def read_all(self): """ Read all snapshots from the list of dump files. """ for file in self.flist: # Test for gzipped file if file.endswith(".gz"): try: f = popen(f"{PIZZA_GUNZIP} -c {file}", 'r') except Exception as e: print(f"Error opening gzipped file {file}: {e}") continue else: try: f = open(file, 'r') except Exception as e: print(f"Error opening file {file}: {e}") continue # Read all snapshots in the current file while True: snap = self.read_snapshot(f) if not snap: break self.snaps.append(snap) print(snap.time, end=' ') sys.stdout.flush() f.close() print() # Sort entries by timestep and remove duplicates self.snaps.sort(key=lambda x: x.time) self.cull() self.nsnaps = len(self.snaps) print(f"read {self.nsnaps} snapshots")
def read_snapshot(self, f)
-
Read a single snapshot from the file.
Parameters
f (file object): Opened file object to read from.
Returns
Snap
- Snapshot object, or None if failed.
Expand source code
def read_snapshot(self, f): """ Read a single snapshot from the file. Parameters: f (file object): Opened file object to read from. Returns: Snap: Snapshot object, or None if failed. """ try: snap = Snap() # Read and discard the first line (usually a comment or header) item = f.readline() if not item: return None # Read time stamp line = f.readline() if not line: return None snap.time = int(line.strip().split()[0]) # Read first field as time # Read number of atoms (line segments) line = f.readline() if not line: return None snap.natoms = int(line.strip()) # Read box bounds line = f.readline() if not line: return None words = f.readline().split() if len(words) < 2: raise Exception("Incomplete box bounds data.") snap.xlo, snap.xhi = float(words[0]), float(words[1]) words = f.readline().split() if len(words) < 2: raise Exception("Incomplete box bounds data.") snap.ylo, snap.yhi = float(words[0]), float(words[1]) words = f.readline().split() if len(words) < 2: raise Exception("Incomplete box bounds data.") snap.zlo, snap.zhi = float(words[0]), float(words[1]) # Read past another line (possibly a header) item = f.readline() if not item: return None if snap.natoms: # Read atom (line segment) data words = f.readline().split() ncol = len(words) atoms = [] atoms.append([float(value) for value in words]) for _ in range(1, snap.natoms): words = f.readline().split() if not words: break atoms.append([float(value) for value in words]) snap.atoms = np.array(atoms, dtype=float) else: snap.atoms = None return snap except Exception as e: print(f"Error reading snapshot: {e}") return None
def time(self)
-
Get a list of all snapshot time stamps.
Returns
list
- List of time stamps.
Expand source code
def time(self): """ Get a list of all snapshot time stamps. Returns: list: List of time stamps. """ return [snap.time for snap in self.snaps]
def viz(self, index, flag=0)
-
Return line segment information for visualization.
Parameters
index (int): Snapshot index or time stamp. flag (int): If 1, interpret index as time stamp.
Returns
tuple
- (time, box, atoms, bonds, tris, lines) atoms, bonds, tris are None. lines contain [id, type, x1, y1, z1, x2, y2, z2] for each line.
Expand source code
def viz(self, index, flag=0): """ Return line segment information for visualization. Parameters: index (int): Snapshot index or time stamp. flag (int): If 1, interpret index as time stamp. Returns: tuple: (time, box, atoms, bonds, tris, lines) atoms, bonds, tris are None. lines contain [id, type, x1, y1, z1, x2, y2, z2] for each line. """ if not flag: isnap = index else: times = self.time() n = len(times) i = 0 while i < n: if times[i] > index: break i += 1 isnap = i - 1 if isnap < 0 or isnap >= self.nsnaps: return (-1, None, None, None, None, None) snap = self.snaps[isnap] time = snap.time box = [snap.xlo, snap.ylo, snap.zlo, snap.xhi, snap.yhi, snap.zhi] # Retrieve column indices try: id_col = self.names["id"] type_col = self.names["type"] end1x_col = self.names["end1x"] end1y_col = self.names["end1y"] end2x_col = self.names["end2x"] end2y_col = self.names["end2y"] except KeyError as e: print(f"Column name {e} not mapped. Use l.map() to assign column names.") return (time, box, None, None, None, None) # Create line list lines = [] for i in range(snap.natoms): atom = snap.atoms[i] try: bond_id = int(atom[id_col]) bond_type = int(atom[type_col]) end1x = float(atom[end1x_col]) end1y = float(atom[end1y_col]) end2x = float(atom[end2x_col]) end2y = float(atom[end2y_col]) except (IndexError, ValueError) as e: print(f"Error processing atom data: {e}") continue # Assuming z1 and z2 are zero since original code sets them to 0.0 if end1x == 0.0 and end1y == 0.0 and end2x == 0.0 and end2y == 0.0: continue # Skip lines with all zeros lines.append([bond_id, bond_type, end1x, end1y, 0.0, end2x, end2y, 0.0]) return (time, box, None, None, None, lines)