Module tdump3

Expand source code
#!/usr/bin/env python3
# -*- coding: utf-8 -*-


__all__ = ['Snap', 'normal', 'tdump']
# tdump3.py

"""
tdump3.py

Module converted from Python 2.x to Python 3.x.

tdump tool

Read dump files with triangle information for visualization purposes.

## Usage

    t = tdump("dump.one")                             # Read one or more dump files
    t = tdump("dump.1 dump.2.gz")                     # Can handle gzipped files
    t = tdump("dump.*")                                # Wildcard expands to multiple files
    t = tdump("dump.*", 0)                             # Two arguments: store filenames but don't read

    time = t.next()                                    # Read next snapshot from dump files

    t.map(1, "id", 3, "x")                             # Assign names to atom columns

      - Must assign id, type, corner1x, corner1y, corner1z,
        corner2x, corner2y, corner2z, corner3x, corner3y, corner3z

    time, box, atoms, bonds, tris, lines = t.viz(index)  # Return list of viz objects

      - `viz()` returns triangle info for the specified timestep index
      - Can also call as `viz(time, 1)` to find index of preceding snapshot
      - `atoms`, `bonds`, `lines` are NULL
      - `tris` contain [id, type, x1, y1, z1, x2, y2, z2, x3, y3, z3, nx, ny, nz] for each triangle

    t.owrap(...)                                       # Wrap triangles to the same image as their atoms

      - `owrap()` is called by dump tool's `owrap()`
      - Useful for wrapping all molecule's atoms/triangles the same so they are contiguous

## Notes

- Incomplete and duplicate snapshots are deleted.
- No column name assignment is performed unless explicitly mapped.
- `viz()` is tailored for visualization tools that require triangle information.
- `owrap()` ensures periodic boundary conditions are maintained for triangles.

## History

- 4/11, Steve Plimpton (SNL): original version
- 2025-01-17, INRAE\Olivier Vitrac conversion

## Dependencies

- Python 3.x
- `numpy` library (fallback to `Numeric` if `numpy` is not available)
"""

# History
#   4/11, Steve Plimpton (SNL): original version
# 2025-01-17, first conversion in connection with the update of pizza.dump3

# Imports and external programs

import sys
import glob
import re
from os import popen
from math import sqrt
from copy import deepcopy
import numpy as np


# Define external dependency directly
PIZZA_GUNZIP = "gunzip"

# --------------------------------------------------------------------
# Helper Classes and Functions

class Snap:
    """
    Represents a single snapshot from a dump file.
    
    Attributes:
        time (int): Time stamp of the snapshot.
        natoms (int): Number of atoms (triangles) in the snapshot.
        xlo, xhi, ylo, yhi, zlo, zhi (float): Box bounds.
        atoms (numpy.ndarray or list): Array of atom (triangle) data.
    """
    def __init__(self):
        self.time = 0
        self.natoms = 0
        self.xlo = self.xhi = self.ylo = self.yhi = self.zlo = self.zhi = 0.0
        self.atoms = None

# --------------------------------------------------------------------
# Function to compute the normal vector for a triangle with 3 vertices

def normal(x, y, z):
    """
    Compute the normal vector for a triangle defined by vertices x, y, z.
    
    Parameters:
        x, y, z (list or tuple): Coordinates of the three vertices.
    
    Returns:
        list: Normal vector [nx, ny, nz].
    """
    v1 = [y[0] - x[0], y[1] - x[1], y[2] - x[2]]
    v2 = [z[0] - y[0], z[1] - y[1], z[2] - y[2]]
    
    n = [
        v1[1]*v2[2] - v1[2]*v2[1],
        v1[2]*v2[0] - v1[0]*v2[2],
        v1[0]*v2[1] - v1[1]*v2[0]
    ]
    
    length = sqrt(n[0]**2 + n[1]**2 + n[2]**2)
    if length == 0:
        return [0.0, 0.0, 0.0]
    n = [component / length for component in n]
    
    return n

# --------------------------------------------------------------------
# tdump Class Definition

class tdump:
    """
    tdump class for reading and processing dump files with triangle information.
    """
    
    # --------------------------------------------------------------------
    
    def __init__(self, *args):
        """
        Initialize the tdump object.
        
        Parameters:
            *args: Variable length argument list.
                   - If one argument, it's the list of files to read.
                   - If two arguments, the second argument indicates incremental reading.
        """
        self.snaps = []
        self.nsnaps = 0
        self.names = {}
        
        # flist = list of all dump file names
        if len(args) == 0:
            raise Exception("No dump files specified for tdump.")
        
        words = args[0].split()
        self.flist = []
        for word in words:
            self.flist += glob.glob(word)
        
        if len(self.flist) == 0 and len(args) == 1:
            raise Exception("No tdump file specified.")
        
        if len(args) == 1:
            self.increment = 0
            self.read_all()
        else:
            self.increment = 1
            self.nextfile = 0
            self.eof = 0
    
    # --------------------------------------------------------------------
    
    def read_all(self):
        """
        Read all snapshots from the list of dump files.
        """
        for file in self.flist:
            # Test for gzipped file
            if file.endswith(".gz"):
                try:
                    f = popen(f"{PIZZA_GUNZIP} -c {file}", 'r')
                except Exception as e:
                    print(f"Error opening gzipped file {file}: {e}")
                    continue
            else:
                try:
                    f = open(file, 'r')
                except Exception as e:
                    print(f"Error opening file {file}: {e}")
                    continue
    
            # Read all snapshots in the current file
            while True:
                snap = self.read_snapshot(f)
                if not snap:
                    break
                self.snaps.append(snap)
                print(snap.time, end=' ')
                sys.stdout.flush()
    
            f.close()
        print()
    
        # Sort entries by timestep and remove duplicates
        self.snaps.sort(key=lambda x: x.time)
        self.cull()
        self.nsnaps = len(self.snaps)
        print(f"read {self.nsnaps} snapshots")
    
    # --------------------------------------------------------------------
    # Read the next snapshot from the list of files (incremental reading)
    
    def next(self):
        """
        Read the next snapshot in incremental mode.
        
        Returns:
            int: Time stamp of the snapshot read, or -1 if no snapshots left.
        """
        if not self.increment:
            raise Exception("Cannot read incrementally with current tdump configuration.")
    
        # Read next snapshot in current file using eof as pointer
        while self.nextfile < len(self.flist):
            file = self.flist[self.nextfile]
            # Open the current file
            if file.endswith(".gz"):
                try:
                    f = popen(f"{PIZZA_GUNZIP} -c {file}", 'r')
                except Exception as e:
                    print(f"Error opening gzipped file {file}: {e}")
                    self.nextfile += 1
                    continue
            else:
                try:
                    f = open(file, 'r')
                except Exception as e:
                    print(f"Error opening file {file}: {e}")
                    self.nextfile += 1
                    continue
    
            # Seek to the last read position
            f.seek(self.eof)
            snap = self.read_snapshot(f)
            if not snap:
                # End of file reached
                f.close()
                self.nextfile += 1
                self.eof = 0
                continue
            self.eof = f.tell()
            f.close()
            
            # Check for duplicate time stamp
            if any(existing_snap.time == snap.time for existing_snap in self.snaps):
                continue  # Skip duplicate
            self.snaps.append(snap)
            self.nsnaps += 1
            return snap.time
    
        return -1  # No snapshots left
    
    # --------------------------------------------------------------------
    # Read a single snapshot from a file
    
    def read_snapshot(self, f):
        """
        Read a single snapshot from the file.
        
        Parameters:
            f (file object): Opened file object to read from.
        
        Returns:
            Snap: Snapshot object, or None if failed.
        """
        try:
            snap = Snap()
            # Read and discard the first line (usually a comment or header)
            item = f.readline()
            if not item:
                return None
            # Read time stamp
            line = f.readline()
            if not line:
                return None
            snap.time = int(line.strip().split()[0])  # Read first field as time
            # Read number of atoms (triangles)
            line = f.readline()
            if not line:
                return None
            snap.natoms = int(line.strip())
    
            # Read box bounds
            line = f.readline()
            if not line:
                return None
            words = f.readline().split()
            if len(words) < 2:
                raise Exception("Incomplete box bounds data.")
            snap.xlo, snap.xhi = float(words[0]), float(words[1])
            words = f.readline().split()
            if len(words) < 2:
                raise Exception("Incomplete box bounds data.")
            snap.ylo, snap.yhi = float(words[0]), float(words[1])
            words = f.readline().split()
            if len(words) < 2:
                raise Exception("Incomplete box bounds data.")
            snap.zlo, snap.zhi = float(words[0]), float(words[1])
    
            # Read past another line (possibly a header)
            item = f.readline()
            if not item:
                return None
    
            if snap.natoms:
                # Read atom (triangle) data
                words = f.readline().split()
                ncol = len(words)
                atoms = []
                atoms.append([float(value) for value in words])
                for _ in range(1, snap.natoms):
                    words = f.readline().split()
                    if not words:
                        break
                    atoms.append([float(value) for value in words])
                snap.atoms = np.array(atoms, dtype=float)
            else:
                snap.atoms = None
            return snap
        except Exception as e:
            print(f"Error reading snapshot: {e}")
            return None
    
    # --------------------------------------------------------------------
    # Map atom column names
    
    def map(self, *pairs):
        """
        Assign names to atom columns.
        
        Parameters:
            *pairs: Pairs of (column_number, "column_name")
                   Example: map(1, "id", 3, "x")
        """
        if len(pairs) % 2 != 0:
            raise Exception("tdump map() requires pairs of mappings.")
        for i in range(0, len(pairs), 2):
            col_num = pairs[i]
            col_name = pairs[i + 1]
            self.names[col_name] = col_num - 1  # Zero-based indexing
    
    # --------------------------------------------------------------------
    # Return list of snapshot time stamps
    
    def time(self):
        """
        Get a list of all snapshot time stamps.
        
        Returns:
            list: List of time stamps.
        """
        return [snap.time for snap in self.snaps]
    
    # --------------------------------------------------------------------
    # Sort snapshots on time stamp
    
    def compare_time(self, a, b):
        """
        Comparator for sorting snapshots by time.
        
        Parameters:
            a (Snap): First snapshot.
            b (Snap): Second snapshot.
        
        Returns:
            int: -1 if a < b, 1 if a > b, 0 if equal.
        """
        if a.time < b.time:
            return -1
        elif a.time > b.time:
            return 1
        else:
            return 0
    
    # --------------------------------------------------------------------
    # Find the index of a given timestep
    
    def findtime(self, n):
        """
        Find the index of a given timestep.
        
        Parameters:
            n (int): The timestep to find.
        
        Returns:
            int: The index of the timestep.
        
        Raises:
            Exception: If the timestep does not exist.
        """
        for i in range(self.nsnaps):
            if self.snaps[i].time == n:
                return i
        raise Exception(f"No step {n} exists.")
    
    # --------------------------------------------------------------------
    # Delete successive snapshots with duplicate time stamp
    
    def cull(self):
        """
        Remove duplicate snapshots based on time stamps.
        """
        i = 1
        while i < len(self.snaps):
            if self.snaps[i].time == self.snaps[i - 1].time:
                del self.snaps[i]
            else:
                i += 1
    
    # --------------------------------------------------------------------
    # Return list of triangles to viz for snapshot isnap
    # If called with flag, then index is timestep, so convert to snapshot index
    
    def viz(self, index, flag=0):
        """
        Return triangle information for visualization.
        
        Parameters:
            index (int): Snapshot index or time stamp.
            flag (int): If 1, interpret index as time stamp.
        
        Returns:
            tuple: (time, box, atoms, bonds, tris, lines)
                   atoms, bonds, lines are None.
                   tris contain [id, type, x1, y1, z1, x2, y2, z2, x3, y3, z3, nx, ny, nz] for each triangle.
        """
        if not flag:
            isnap = index
        else:
            times = self.time()
            n = len(times)
            i = 0
            while i < n:
                if times[i] > index:
                    break
                i += 1
            isnap = i - 1
        if isnap < 0 or isnap >= self.nsnaps:
            return (-1, None, None, None, None, None)
        snap = self.snaps[isnap]
    
        time = snap.time
        box = [snap.xlo, snap.ylo, snap.zlo, snap.xhi, snap.yhi, snap.zhi]
        try:
            id_col = self.names["id"]
            type_col = self.names["type"]
            corner1x_col = self.names["corner1x"]
            corner1y_col = self.names["corner1y"]
            corner1z_col = self.names["corner1z"]
            corner2x_col = self.names["corner2x"]
            corner2y_col = self.names["corner2y"]
            corner2z_col = self.names["corner2z"]
            corner3x_col = self.names["corner3x"]
            corner3y_col = self.names["corner3y"]
            corner3z_col = self.names["corner3z"]
        except KeyError as e:
            print(f"Column name {e} not mapped. Use t.map() to assign column names.")
            return (time, box, None, None, None, None)
    
        tris = []
        for i in range(snap.natoms):
            atom = snap.atoms[i]
            try:
                c1 = [atom[corner1x_col], atom[corner1y_col], atom[corner1z_col]]
                c2 = [atom[corner2x_col], atom[corner2y_col], atom[corner2z_col]]
                c3 = [atom[corner3x_col], atom[corner3y_col], atom[corner3z_col]]
            except IndexError:
                print(f"Error accessing corners for triangle {i} in timestep {time}.")
                continue
            n = normal(c1, c2, c3)
            # Skip triangle if all corners are zero
            if (c1[0] == 0.0 and c1[1] == 0.0 and c1[2] == 0.0 and
                c2[0] == 0.0 and c2[1] == 0.0 and c2[2] == 0.0):
                continue
            tris.append([int(atom[id_col]), int(atom[type_col])] + c1 + c2 + c3 + n)
    
        atoms = None
        bonds = None
        lines = None
    
        return (time, box, atoms, bonds, tris, lines)
    
    # --------------------------------------------------------------------
    # Wrap triangle corner points associated with atoms through periodic boundaries
    # Invoked by dump() when it does an owrap() on its atoms
    
    def owrap(self, time, xprd, yprd, zprd, idsdump, atomsdump, iother, ix, iy, iz):
        """
        Wrap triangle corner points associated with atoms through periodic boundaries.
        
        Parameters:
            time (int): Time stamp of the snapshot.
            xprd, yprd, zprd (float): Periodicity in x, y, z directions.
            idsdump (dict): Dictionary mapping atom IDs to dump indices.
            atomsdump (list): List of atoms from the dump.
            iother (int): Other index for wrapping.
            ix, iy, iz (int): Column indices for x, y, z coordinates.
        """
        try:
            id_col = self.names["id"]
            corner1x_col = self.names["corner1x"]
            corner1y_col = self.names["corner1y"]
            corner1z_col = self.names["corner1z"]
            corner2x_col = self.names["corner2x"]
            corner2y_col = self.names["corner2y"]
            corner2z_col = self.names["corner2z"]
            corner3x_col = self.names["corner3x"]
            corner3y_col = self.names["corner3y"]
            corner3z_col = self.names["corner3z"]
        except KeyError as e:
            print(f"Column name {e} not mapped. Use t.map() to assign column names.")
            return
    
        try:
            isnap = self.findtime(time)
        except Exception as e:
            print(e)
            return
        snap = self.snaps[isnap]
        atoms = snap.atoms
    
        # idump = index of my triangle I in dump's atoms
        # jdump = atom J in dump's atoms that triangle I was owrapped on
        # delx, dely, delz = offset applied to triangle I
        
        for i in range(snap.natoms):
            try:
                tag = int(atoms[i][id_col])
                idump = idsdump[tag]
                jdump = idsdump[int(atomsdump[idump][iother])]
                delx = (atomsdump[idump][ix] - atomsdump[jdump][ix]) * xprd
                dely = (atomsdump[idump][iy] - atomsdump[jdump][iy]) * yprd
                delz = (atomsdump[idump][iz] - atomsdump[jdump][iz]) * zprd
                # Apply offsets to all corners
                atoms[i][corner1x_col] += delx
                atoms[i][corner1y_col] += dely
                atoms[i][corner1z_col] += delz
                atoms[i][corner2x_col] += delx
                atoms[i][corner2y_col] += dely
                atoms[i][corner2z_col] += delz
                atoms[i][corner3x_col] += delx
                atoms[i][corner3y_col] += dely
                atoms[i][corner3z_col] += delz
            except (KeyError, IndexError, ValueError) as e:
                print(f"Error in owrap: {e}")
                continue

# --------------------------------------------------------------------
# Example Usage (for testing purposes)
# Uncomment the following lines to test the tdump3.py module

if __name__ == "__main__":
    t = tdump("dump.one")
    t.map(1, "id", 3, "x", 4, "y", 5, "z",
          6, "corner1x", 7, "corner1y", 8, "corner1z",
          9, "corner2x", 10, "corner2y", 11, "corner2z",
          12, "corner3x", 13, "corner3y", 14, "corner3z")
    while True:
        time = t.next()
        if time == -1:
            break
        print(f"Snapshot Time: {time}")
        _, box, atoms, bonds, tris, lines = t.viz(time, 1)
        print(f"Triangles: {tris}")

Functions

def normal(x, y, z)

Compute the normal vector for a triangle defined by vertices x, y, z.

Parameters

x, y, z (list or tuple): Coordinates of the three vertices.

Returns

list
Normal vector [nx, ny, nz].
Expand source code
def normal(x, y, z):
    """
    Compute the normal vector for a triangle defined by vertices x, y, z.
    
    Parameters:
        x, y, z (list or tuple): Coordinates of the three vertices.
    
    Returns:
        list: Normal vector [nx, ny, nz].
    """
    v1 = [y[0] - x[0], y[1] - x[1], y[2] - x[2]]
    v2 = [z[0] - y[0], z[1] - y[1], z[2] - y[2]]
    
    n = [
        v1[1]*v2[2] - v1[2]*v2[1],
        v1[2]*v2[0] - v1[0]*v2[2],
        v1[0]*v2[1] - v1[1]*v2[0]
    ]
    
    length = sqrt(n[0]**2 + n[1]**2 + n[2]**2)
    if length == 0:
        return [0.0, 0.0, 0.0]
    n = [component / length for component in n]
    
    return n

Classes

class Snap

Represents a single snapshot from a dump file.

Attributes

time : int
Time stamp of the snapshot.
natoms : int
Number of atoms (triangles) in the snapshot.
xlo, xhi, ylo, yhi, zlo, zhi (float): Box bounds.
atoms : numpy.ndarray or list
Array of atom (triangle) data.
Expand source code
class Snap:
    """
    Represents a single snapshot from a dump file.
    
    Attributes:
        time (int): Time stamp of the snapshot.
        natoms (int): Number of atoms (triangles) in the snapshot.
        xlo, xhi, ylo, yhi, zlo, zhi (float): Box bounds.
        atoms (numpy.ndarray or list): Array of atom (triangle) data.
    """
    def __init__(self):
        self.time = 0
        self.natoms = 0
        self.xlo = self.xhi = self.ylo = self.yhi = self.zlo = self.zhi = 0.0
        self.atoms = None
class tdump (*args)

tdump class for reading and processing dump files with triangle information.

Initialize the tdump object.

Parameters

*args: Variable length argument list. - If one argument, it's the list of files to read. - If two arguments, the second argument indicates incremental reading.

Expand source code
class tdump:
    """
    tdump class for reading and processing dump files with triangle information.
    """
    
    # --------------------------------------------------------------------
    
    def __init__(self, *args):
        """
        Initialize the tdump object.
        
        Parameters:
            *args: Variable length argument list.
                   - If one argument, it's the list of files to read.
                   - If two arguments, the second argument indicates incremental reading.
        """
        self.snaps = []
        self.nsnaps = 0
        self.names = {}
        
        # flist = list of all dump file names
        if len(args) == 0:
            raise Exception("No dump files specified for tdump.")
        
        words = args[0].split()
        self.flist = []
        for word in words:
            self.flist += glob.glob(word)
        
        if len(self.flist) == 0 and len(args) == 1:
            raise Exception("No tdump file specified.")
        
        if len(args) == 1:
            self.increment = 0
            self.read_all()
        else:
            self.increment = 1
            self.nextfile = 0
            self.eof = 0
    
    # --------------------------------------------------------------------
    
    def read_all(self):
        """
        Read all snapshots from the list of dump files.
        """
        for file in self.flist:
            # Test for gzipped file
            if file.endswith(".gz"):
                try:
                    f = popen(f"{PIZZA_GUNZIP} -c {file}", 'r')
                except Exception as e:
                    print(f"Error opening gzipped file {file}: {e}")
                    continue
            else:
                try:
                    f = open(file, 'r')
                except Exception as e:
                    print(f"Error opening file {file}: {e}")
                    continue
    
            # Read all snapshots in the current file
            while True:
                snap = self.read_snapshot(f)
                if not snap:
                    break
                self.snaps.append(snap)
                print(snap.time, end=' ')
                sys.stdout.flush()
    
            f.close()
        print()
    
        # Sort entries by timestep and remove duplicates
        self.snaps.sort(key=lambda x: x.time)
        self.cull()
        self.nsnaps = len(self.snaps)
        print(f"read {self.nsnaps} snapshots")
    
    # --------------------------------------------------------------------
    # Read the next snapshot from the list of files (incremental reading)
    
    def next(self):
        """
        Read the next snapshot in incremental mode.
        
        Returns:
            int: Time stamp of the snapshot read, or -1 if no snapshots left.
        """
        if not self.increment:
            raise Exception("Cannot read incrementally with current tdump configuration.")
    
        # Read next snapshot in current file using eof as pointer
        while self.nextfile < len(self.flist):
            file = self.flist[self.nextfile]
            # Open the current file
            if file.endswith(".gz"):
                try:
                    f = popen(f"{PIZZA_GUNZIP} -c {file}", 'r')
                except Exception as e:
                    print(f"Error opening gzipped file {file}: {e}")
                    self.nextfile += 1
                    continue
            else:
                try:
                    f = open(file, 'r')
                except Exception as e:
                    print(f"Error opening file {file}: {e}")
                    self.nextfile += 1
                    continue
    
            # Seek to the last read position
            f.seek(self.eof)
            snap = self.read_snapshot(f)
            if not snap:
                # End of file reached
                f.close()
                self.nextfile += 1
                self.eof = 0
                continue
            self.eof = f.tell()
            f.close()
            
            # Check for duplicate time stamp
            if any(existing_snap.time == snap.time for existing_snap in self.snaps):
                continue  # Skip duplicate
            self.snaps.append(snap)
            self.nsnaps += 1
            return snap.time
    
        return -1  # No snapshots left
    
    # --------------------------------------------------------------------
    # Read a single snapshot from a file
    
    def read_snapshot(self, f):
        """
        Read a single snapshot from the file.
        
        Parameters:
            f (file object): Opened file object to read from.
        
        Returns:
            Snap: Snapshot object, or None if failed.
        """
        try:
            snap = Snap()
            # Read and discard the first line (usually a comment or header)
            item = f.readline()
            if not item:
                return None
            # Read time stamp
            line = f.readline()
            if not line:
                return None
            snap.time = int(line.strip().split()[0])  # Read first field as time
            # Read number of atoms (triangles)
            line = f.readline()
            if not line:
                return None
            snap.natoms = int(line.strip())
    
            # Read box bounds
            line = f.readline()
            if not line:
                return None
            words = f.readline().split()
            if len(words) < 2:
                raise Exception("Incomplete box bounds data.")
            snap.xlo, snap.xhi = float(words[0]), float(words[1])
            words = f.readline().split()
            if len(words) < 2:
                raise Exception("Incomplete box bounds data.")
            snap.ylo, snap.yhi = float(words[0]), float(words[1])
            words = f.readline().split()
            if len(words) < 2:
                raise Exception("Incomplete box bounds data.")
            snap.zlo, snap.zhi = float(words[0]), float(words[1])
    
            # Read past another line (possibly a header)
            item = f.readline()
            if not item:
                return None
    
            if snap.natoms:
                # Read atom (triangle) data
                words = f.readline().split()
                ncol = len(words)
                atoms = []
                atoms.append([float(value) for value in words])
                for _ in range(1, snap.natoms):
                    words = f.readline().split()
                    if not words:
                        break
                    atoms.append([float(value) for value in words])
                snap.atoms = np.array(atoms, dtype=float)
            else:
                snap.atoms = None
            return snap
        except Exception as e:
            print(f"Error reading snapshot: {e}")
            return None
    
    # --------------------------------------------------------------------
    # Map atom column names
    
    def map(self, *pairs):
        """
        Assign names to atom columns.
        
        Parameters:
            *pairs: Pairs of (column_number, "column_name")
                   Example: map(1, "id", 3, "x")
        """
        if len(pairs) % 2 != 0:
            raise Exception("tdump map() requires pairs of mappings.")
        for i in range(0, len(pairs), 2):
            col_num = pairs[i]
            col_name = pairs[i + 1]
            self.names[col_name] = col_num - 1  # Zero-based indexing
    
    # --------------------------------------------------------------------
    # Return list of snapshot time stamps
    
    def time(self):
        """
        Get a list of all snapshot time stamps.
        
        Returns:
            list: List of time stamps.
        """
        return [snap.time for snap in self.snaps]
    
    # --------------------------------------------------------------------
    # Sort snapshots on time stamp
    
    def compare_time(self, a, b):
        """
        Comparator for sorting snapshots by time.
        
        Parameters:
            a (Snap): First snapshot.
            b (Snap): Second snapshot.
        
        Returns:
            int: -1 if a < b, 1 if a > b, 0 if equal.
        """
        if a.time < b.time:
            return -1
        elif a.time > b.time:
            return 1
        else:
            return 0
    
    # --------------------------------------------------------------------
    # Find the index of a given timestep
    
    def findtime(self, n):
        """
        Find the index of a given timestep.
        
        Parameters:
            n (int): The timestep to find.
        
        Returns:
            int: The index of the timestep.
        
        Raises:
            Exception: If the timestep does not exist.
        """
        for i in range(self.nsnaps):
            if self.snaps[i].time == n:
                return i
        raise Exception(f"No step {n} exists.")
    
    # --------------------------------------------------------------------
    # Delete successive snapshots with duplicate time stamp
    
    def cull(self):
        """
        Remove duplicate snapshots based on time stamps.
        """
        i = 1
        while i < len(self.snaps):
            if self.snaps[i].time == self.snaps[i - 1].time:
                del self.snaps[i]
            else:
                i += 1
    
    # --------------------------------------------------------------------
    # Return list of triangles to viz for snapshot isnap
    # If called with flag, then index is timestep, so convert to snapshot index
    
    def viz(self, index, flag=0):
        """
        Return triangle information for visualization.
        
        Parameters:
            index (int): Snapshot index or time stamp.
            flag (int): If 1, interpret index as time stamp.
        
        Returns:
            tuple: (time, box, atoms, bonds, tris, lines)
                   atoms, bonds, lines are None.
                   tris contain [id, type, x1, y1, z1, x2, y2, z2, x3, y3, z3, nx, ny, nz] for each triangle.
        """
        if not flag:
            isnap = index
        else:
            times = self.time()
            n = len(times)
            i = 0
            while i < n:
                if times[i] > index:
                    break
                i += 1
            isnap = i - 1
        if isnap < 0 or isnap >= self.nsnaps:
            return (-1, None, None, None, None, None)
        snap = self.snaps[isnap]
    
        time = snap.time
        box = [snap.xlo, snap.ylo, snap.zlo, snap.xhi, snap.yhi, snap.zhi]
        try:
            id_col = self.names["id"]
            type_col = self.names["type"]
            corner1x_col = self.names["corner1x"]
            corner1y_col = self.names["corner1y"]
            corner1z_col = self.names["corner1z"]
            corner2x_col = self.names["corner2x"]
            corner2y_col = self.names["corner2y"]
            corner2z_col = self.names["corner2z"]
            corner3x_col = self.names["corner3x"]
            corner3y_col = self.names["corner3y"]
            corner3z_col = self.names["corner3z"]
        except KeyError as e:
            print(f"Column name {e} not mapped. Use t.map() to assign column names.")
            return (time, box, None, None, None, None)
    
        tris = []
        for i in range(snap.natoms):
            atom = snap.atoms[i]
            try:
                c1 = [atom[corner1x_col], atom[corner1y_col], atom[corner1z_col]]
                c2 = [atom[corner2x_col], atom[corner2y_col], atom[corner2z_col]]
                c3 = [atom[corner3x_col], atom[corner3y_col], atom[corner3z_col]]
            except IndexError:
                print(f"Error accessing corners for triangle {i} in timestep {time}.")
                continue
            n = normal(c1, c2, c3)
            # Skip triangle if all corners are zero
            if (c1[0] == 0.0 and c1[1] == 0.0 and c1[2] == 0.0 and
                c2[0] == 0.0 and c2[1] == 0.0 and c2[2] == 0.0):
                continue
            tris.append([int(atom[id_col]), int(atom[type_col])] + c1 + c2 + c3 + n)
    
        atoms = None
        bonds = None
        lines = None
    
        return (time, box, atoms, bonds, tris, lines)
    
    # --------------------------------------------------------------------
    # Wrap triangle corner points associated with atoms through periodic boundaries
    # Invoked by dump() when it does an owrap() on its atoms
    
    def owrap(self, time, xprd, yprd, zprd, idsdump, atomsdump, iother, ix, iy, iz):
        """
        Wrap triangle corner points associated with atoms through periodic boundaries.
        
        Parameters:
            time (int): Time stamp of the snapshot.
            xprd, yprd, zprd (float): Periodicity in x, y, z directions.
            idsdump (dict): Dictionary mapping atom IDs to dump indices.
            atomsdump (list): List of atoms from the dump.
            iother (int): Other index for wrapping.
            ix, iy, iz (int): Column indices for x, y, z coordinates.
        """
        try:
            id_col = self.names["id"]
            corner1x_col = self.names["corner1x"]
            corner1y_col = self.names["corner1y"]
            corner1z_col = self.names["corner1z"]
            corner2x_col = self.names["corner2x"]
            corner2y_col = self.names["corner2y"]
            corner2z_col = self.names["corner2z"]
            corner3x_col = self.names["corner3x"]
            corner3y_col = self.names["corner3y"]
            corner3z_col = self.names["corner3z"]
        except KeyError as e:
            print(f"Column name {e} not mapped. Use t.map() to assign column names.")
            return
    
        try:
            isnap = self.findtime(time)
        except Exception as e:
            print(e)
            return
        snap = self.snaps[isnap]
        atoms = snap.atoms
    
        # idump = index of my triangle I in dump's atoms
        # jdump = atom J in dump's atoms that triangle I was owrapped on
        # delx, dely, delz = offset applied to triangle I
        
        for i in range(snap.natoms):
            try:
                tag = int(atoms[i][id_col])
                idump = idsdump[tag]
                jdump = idsdump[int(atomsdump[idump][iother])]
                delx = (atomsdump[idump][ix] - atomsdump[jdump][ix]) * xprd
                dely = (atomsdump[idump][iy] - atomsdump[jdump][iy]) * yprd
                delz = (atomsdump[idump][iz] - atomsdump[jdump][iz]) * zprd
                # Apply offsets to all corners
                atoms[i][corner1x_col] += delx
                atoms[i][corner1y_col] += dely
                atoms[i][corner1z_col] += delz
                atoms[i][corner2x_col] += delx
                atoms[i][corner2y_col] += dely
                atoms[i][corner2z_col] += delz
                atoms[i][corner3x_col] += delx
                atoms[i][corner3y_col] += dely
                atoms[i][corner3z_col] += delz
            except (KeyError, IndexError, ValueError) as e:
                print(f"Error in owrap: {e}")
                continue

Methods

def compare_time(self, a, b)

Comparator for sorting snapshots by time.

Parameters

a (Snap): First snapshot. b (Snap): Second snapshot.

Returns

int
-1 if a < b, 1 if a > b, 0 if equal.
Expand source code
def compare_time(self, a, b):
    """
    Comparator for sorting snapshots by time.
    
    Parameters:
        a (Snap): First snapshot.
        b (Snap): Second snapshot.
    
    Returns:
        int: -1 if a < b, 1 if a > b, 0 if equal.
    """
    if a.time < b.time:
        return -1
    elif a.time > b.time:
        return 1
    else:
        return 0
def cull(self)

Remove duplicate snapshots based on time stamps.

Expand source code
def cull(self):
    """
    Remove duplicate snapshots based on time stamps.
    """
    i = 1
    while i < len(self.snaps):
        if self.snaps[i].time == self.snaps[i - 1].time:
            del self.snaps[i]
        else:
            i += 1
def findtime(self, n)

Find the index of a given timestep.

Parameters

n (int): The timestep to find.

Returns

int
The index of the timestep.

Raises

Exception
If the timestep does not exist.
Expand source code
def findtime(self, n):
    """
    Find the index of a given timestep.
    
    Parameters:
        n (int): The timestep to find.
    
    Returns:
        int: The index of the timestep.
    
    Raises:
        Exception: If the timestep does not exist.
    """
    for i in range(self.nsnaps):
        if self.snaps[i].time == n:
            return i
    raise Exception(f"No step {n} exists.")
def map(self, *pairs)

Assign names to atom columns.

Parameters

*pairs: Pairs of (column_number, "column_name") Example: map(1, "id", 3, "x")

Expand source code
def map(self, *pairs):
    """
    Assign names to atom columns.
    
    Parameters:
        *pairs: Pairs of (column_number, "column_name")
               Example: map(1, "id", 3, "x")
    """
    if len(pairs) % 2 != 0:
        raise Exception("tdump map() requires pairs of mappings.")
    for i in range(0, len(pairs), 2):
        col_num = pairs[i]
        col_name = pairs[i + 1]
        self.names[col_name] = col_num - 1  # Zero-based indexing
def next(self)

Read the next snapshot in incremental mode.

Returns

int
Time stamp of the snapshot read, or -1 if no snapshots left.
Expand source code
def next(self):
    """
    Read the next snapshot in incremental mode.
    
    Returns:
        int: Time stamp of the snapshot read, or -1 if no snapshots left.
    """
    if not self.increment:
        raise Exception("Cannot read incrementally with current tdump configuration.")

    # Read next snapshot in current file using eof as pointer
    while self.nextfile < len(self.flist):
        file = self.flist[self.nextfile]
        # Open the current file
        if file.endswith(".gz"):
            try:
                f = popen(f"{PIZZA_GUNZIP} -c {file}", 'r')
            except Exception as e:
                print(f"Error opening gzipped file {file}: {e}")
                self.nextfile += 1
                continue
        else:
            try:
                f = open(file, 'r')
            except Exception as e:
                print(f"Error opening file {file}: {e}")
                self.nextfile += 1
                continue

        # Seek to the last read position
        f.seek(self.eof)
        snap = self.read_snapshot(f)
        if not snap:
            # End of file reached
            f.close()
            self.nextfile += 1
            self.eof = 0
            continue
        self.eof = f.tell()
        f.close()
        
        # Check for duplicate time stamp
        if any(existing_snap.time == snap.time for existing_snap in self.snaps):
            continue  # Skip duplicate
        self.snaps.append(snap)
        self.nsnaps += 1
        return snap.time

    return -1  # No snapshots left
def owrap(self, time, xprd, yprd, zprd, idsdump, atomsdump, iother, ix, iy, iz)

Wrap triangle corner points associated with atoms through periodic boundaries.

Parameters

time (int): Time stamp of the snapshot. xprd, yprd, zprd (float): Periodicity in x, y, z directions. idsdump (dict): Dictionary mapping atom IDs to dump indices. atomsdump (list): List of atoms from the dump. iother (int): Other index for wrapping. ix, iy, iz (int): Column indices for x, y, z coordinates.

Expand source code
def owrap(self, time, xprd, yprd, zprd, idsdump, atomsdump, iother, ix, iy, iz):
    """
    Wrap triangle corner points associated with atoms through periodic boundaries.
    
    Parameters:
        time (int): Time stamp of the snapshot.
        xprd, yprd, zprd (float): Periodicity in x, y, z directions.
        idsdump (dict): Dictionary mapping atom IDs to dump indices.
        atomsdump (list): List of atoms from the dump.
        iother (int): Other index for wrapping.
        ix, iy, iz (int): Column indices for x, y, z coordinates.
    """
    try:
        id_col = self.names["id"]
        corner1x_col = self.names["corner1x"]
        corner1y_col = self.names["corner1y"]
        corner1z_col = self.names["corner1z"]
        corner2x_col = self.names["corner2x"]
        corner2y_col = self.names["corner2y"]
        corner2z_col = self.names["corner2z"]
        corner3x_col = self.names["corner3x"]
        corner3y_col = self.names["corner3y"]
        corner3z_col = self.names["corner3z"]
    except KeyError as e:
        print(f"Column name {e} not mapped. Use t.map() to assign column names.")
        return

    try:
        isnap = self.findtime(time)
    except Exception as e:
        print(e)
        return
    snap = self.snaps[isnap]
    atoms = snap.atoms

    # idump = index of my triangle I in dump's atoms
    # jdump = atom J in dump's atoms that triangle I was owrapped on
    # delx, dely, delz = offset applied to triangle I
    
    for i in range(snap.natoms):
        try:
            tag = int(atoms[i][id_col])
            idump = idsdump[tag]
            jdump = idsdump[int(atomsdump[idump][iother])]
            delx = (atomsdump[idump][ix] - atomsdump[jdump][ix]) * xprd
            dely = (atomsdump[idump][iy] - atomsdump[jdump][iy]) * yprd
            delz = (atomsdump[idump][iz] - atomsdump[jdump][iz]) * zprd
            # Apply offsets to all corners
            atoms[i][corner1x_col] += delx
            atoms[i][corner1y_col] += dely
            atoms[i][corner1z_col] += delz
            atoms[i][corner2x_col] += delx
            atoms[i][corner2y_col] += dely
            atoms[i][corner2z_col] += delz
            atoms[i][corner3x_col] += delx
            atoms[i][corner3y_col] += dely
            atoms[i][corner3z_col] += delz
        except (KeyError, IndexError, ValueError) as e:
            print(f"Error in owrap: {e}")
            continue
def read_all(self)

Read all snapshots from the list of dump files.

Expand source code
def read_all(self):
    """
    Read all snapshots from the list of dump files.
    """
    for file in self.flist:
        # Test for gzipped file
        if file.endswith(".gz"):
            try:
                f = popen(f"{PIZZA_GUNZIP} -c {file}", 'r')
            except Exception as e:
                print(f"Error opening gzipped file {file}: {e}")
                continue
        else:
            try:
                f = open(file, 'r')
            except Exception as e:
                print(f"Error opening file {file}: {e}")
                continue

        # Read all snapshots in the current file
        while True:
            snap = self.read_snapshot(f)
            if not snap:
                break
            self.snaps.append(snap)
            print(snap.time, end=' ')
            sys.stdout.flush()

        f.close()
    print()

    # Sort entries by timestep and remove duplicates
    self.snaps.sort(key=lambda x: x.time)
    self.cull()
    self.nsnaps = len(self.snaps)
    print(f"read {self.nsnaps} snapshots")
def read_snapshot(self, f)

Read a single snapshot from the file.

Parameters

f (file object): Opened file object to read from.

Returns

Snap
Snapshot object, or None if failed.
Expand source code
def read_snapshot(self, f):
    """
    Read a single snapshot from the file.
    
    Parameters:
        f (file object): Opened file object to read from.
    
    Returns:
        Snap: Snapshot object, or None if failed.
    """
    try:
        snap = Snap()
        # Read and discard the first line (usually a comment or header)
        item = f.readline()
        if not item:
            return None
        # Read time stamp
        line = f.readline()
        if not line:
            return None
        snap.time = int(line.strip().split()[0])  # Read first field as time
        # Read number of atoms (triangles)
        line = f.readline()
        if not line:
            return None
        snap.natoms = int(line.strip())

        # Read box bounds
        line = f.readline()
        if not line:
            return None
        words = f.readline().split()
        if len(words) < 2:
            raise Exception("Incomplete box bounds data.")
        snap.xlo, snap.xhi = float(words[0]), float(words[1])
        words = f.readline().split()
        if len(words) < 2:
            raise Exception("Incomplete box bounds data.")
        snap.ylo, snap.yhi = float(words[0]), float(words[1])
        words = f.readline().split()
        if len(words) < 2:
            raise Exception("Incomplete box bounds data.")
        snap.zlo, snap.zhi = float(words[0]), float(words[1])

        # Read past another line (possibly a header)
        item = f.readline()
        if not item:
            return None

        if snap.natoms:
            # Read atom (triangle) data
            words = f.readline().split()
            ncol = len(words)
            atoms = []
            atoms.append([float(value) for value in words])
            for _ in range(1, snap.natoms):
                words = f.readline().split()
                if not words:
                    break
                atoms.append([float(value) for value in words])
            snap.atoms = np.array(atoms, dtype=float)
        else:
            snap.atoms = None
        return snap
    except Exception as e:
        print(f"Error reading snapshot: {e}")
        return None
def time(self)

Get a list of all snapshot time stamps.

Returns

list
List of time stamps.
Expand source code
def time(self):
    """
    Get a list of all snapshot time stamps.
    
    Returns:
        list: List of time stamps.
    """
    return [snap.time for snap in self.snaps]
def viz(self, index, flag=0)

Return triangle information for visualization.

Parameters

index (int): Snapshot index or time stamp. flag (int): If 1, interpret index as time stamp.

Returns

tuple
(time, box, atoms, bonds, tris, lines) atoms, bonds, lines are None. tris contain [id, type, x1, y1, z1, x2, y2, z2, x3, y3, z3, nx, ny, nz] for each triangle.
Expand source code
def viz(self, index, flag=0):
    """
    Return triangle information for visualization.
    
    Parameters:
        index (int): Snapshot index or time stamp.
        flag (int): If 1, interpret index as time stamp.
    
    Returns:
        tuple: (time, box, atoms, bonds, tris, lines)
               atoms, bonds, lines are None.
               tris contain [id, type, x1, y1, z1, x2, y2, z2, x3, y3, z3, nx, ny, nz] for each triangle.
    """
    if not flag:
        isnap = index
    else:
        times = self.time()
        n = len(times)
        i = 0
        while i < n:
            if times[i] > index:
                break
            i += 1
        isnap = i - 1
    if isnap < 0 or isnap >= self.nsnaps:
        return (-1, None, None, None, None, None)
    snap = self.snaps[isnap]

    time = snap.time
    box = [snap.xlo, snap.ylo, snap.zlo, snap.xhi, snap.yhi, snap.zhi]
    try:
        id_col = self.names["id"]
        type_col = self.names["type"]
        corner1x_col = self.names["corner1x"]
        corner1y_col = self.names["corner1y"]
        corner1z_col = self.names["corner1z"]
        corner2x_col = self.names["corner2x"]
        corner2y_col = self.names["corner2y"]
        corner2z_col = self.names["corner2z"]
        corner3x_col = self.names["corner3x"]
        corner3y_col = self.names["corner3y"]
        corner3z_col = self.names["corner3z"]
    except KeyError as e:
        print(f"Column name {e} not mapped. Use t.map() to assign column names.")
        return (time, box, None, None, None, None)

    tris = []
    for i in range(snap.natoms):
        atom = snap.atoms[i]
        try:
            c1 = [atom[corner1x_col], atom[corner1y_col], atom[corner1z_col]]
            c2 = [atom[corner2x_col], atom[corner2y_col], atom[corner2z_col]]
            c3 = [atom[corner3x_col], atom[corner3y_col], atom[corner3z_col]]
        except IndexError:
            print(f"Error accessing corners for triangle {i} in timestep {time}.")
            continue
        n = normal(c1, c2, c3)
        # Skip triangle if all corners are zero
        if (c1[0] == 0.0 and c1[1] == 0.0 and c1[2] == 0.0 and
            c2[0] == 0.0 and c2[1] == 0.0 and c2[2] == 0.0):
            continue
        tris.append([int(atom[id_col]), int(atom[type_col])] + c1 + c2 + c3 + n)

    atoms = None
    bonds = None
    lines = None

    return (time, box, atoms, bonds, tris, lines)