Module mdump3

Expand source code
#!/usr/bin/env python3
# -*- coding: utf-8 -*-


__all__ = ['Snap', 'eselect', 'mdump', 'normal', 'tselect']
# mdump3.py

"""
mdump3.py

Module converted from Python 2.x to Python 3.x.

mdump tool

Read, write, and manipulate mesh dump files for visualization purposes.

## Usage

    m = mdump("mesh.one")                          # Read one or more mesh dump files
    m = mdump("mesh.1 mesh.2.gz")                  # Can handle gzipped files
    m = mdump("mesh.*")                             # Wildcard expands to multiple files
    m = mdump("mesh.*", 0)                          # Two arguments: store filenames but don't read

    time = m.next()                                 # Read next snapshot from dump files

    m.map(2, "temperature")                         # Assign names to element value columns

    m.tselect.all()                                 # Select all timesteps
    m.tselect.one(N)                                # Select only timestep N
    m.tselect.none()                                # Deselect all timesteps
    m.tselect.skip(M)                               # Select every Mth step
    m.tselect.test("$t >= 100 and $t < 10000")      # Select matching timesteps
    m.delete()                                      # Delete non-selected timesteps

    m.eselect.all()                                 # Select all elements in all steps
    m.eselect.all(N)                                # Select all elements in one step
    m.eselect.test("$id > 100 and $type == 2")      # Select matching elements in all steps
    m.eselect.test("$id > 100 and $type == 2", N)  # Select matching elements in one step

    t = m.time()                                     # Return vector of selected timestep values
    fx, fy, ... = m.vecs(1000, "fx", "fy", ...)     # Return vector(s) for timestep N

    index, time, flag = m.iterator(0/1)             # Loop over mesh dump snapshots
    time, box, atoms, bonds, tris, lines = m.viz(index)  # Return list of viz objects
    nodes, elements, nvalues, evalues = m.mviz(index)    # Return list of mesh viz objects
    m.etype = "color"                               # Set column returned as "type" by viz

    m.owrap(...)                                    # Wrap lines to the same image as their atoms

      - `owrap()` is called by dump tool's `owrap()`
      - Useful for wrapping all molecule's atoms/lines the same so they are contiguous

## Notes

- Incomplete and duplicate snapshots are deleted.
- No column name assignment or unscaling is performed unless explicitly mapped.
- `viz()` and `mviz()` are tailored for visualization tools that require mesh and element information.
- `owrap()` ensures periodic boundary conditions are maintained for line segments.

## History

- 11/06, Steve Plimpton (SNL): original version
- 12/09, David Hart (SNL): allow use of NumPy or Numeric
- 2025-01-17, INRAE\Olivier Vitrac conversion 

## Dependencies

- Python 3.x
- `numpy` library (fallback to `Numeric` if `numpy` is not available)
"""

# History
#   11/06, Steve Plimpton (SNL): original version
#   12/09, David Hart (SNL): allow use of NumPy or Numeric
# 2025-01-17, first conversion in connection with the update of pizza.dump3

# Imports and external programs

import sys
import glob
import re
from os import popen
from math import sqrt
from copy import deepcopy
import numpy as np

# Define external dependency directly
PIZZA_GUNZIP = "gunzip"

# --------------------------------------------------------------------
# Helper Classes and Functions

class Snap:
    """
    Represents a single snapshot from a mesh dump file.
    
    Attributes:
        time (int): Time stamp of the snapshot.
        tselect (int): 0 or 1 indicating if this snapshot is selected.
        nselect (int): Number of selected elements in this snapshot.
        nflag (int): 0 or 1 indicating if this snapshot has nodal coordinates.
        eflag (int): 0 or 1-4 indicating the type of elements (tri, tet, square, cube).
        nvalueflag (int): 0 or 1 indicating if this snapshot has nodal values.
        evalueflag (int): 0 or 1 indicating if this snapshot has element values.
        xlo, xhi, ylo, yhi, zlo, zhi (float): Box bounds.
        nodes (numpy.ndarray): Array of node data.
        elements (numpy.ndarray): Array of element data.
        nvalues (numpy.ndarray): Array of node values.
        evalues (numpy.ndarray): Array of element values.
        eselect (numpy.ndarray): Array indicating selected elements.
    """
    def __init__(self):
        self.time = 0
        self.tselect = 0
        self.nselect = 0
        self.nflag = 0
        self.eflag = 0
        self.nvalueflag = 0
        self.evalueflag = 0
        self.xlo = self.xhi = self.ylo = self.yhi = self.zlo = self.zhi = 0.0
        self.nodes = None
        self.elements = None
        self.nvalues = None
        self.evalues = None
        self.eselect = None

# --------------------------------------------------------------------
# Function to compute the normal vector for a triangle with 3 vertices

def normal(x, y, z):
    """
    Compute the normal vector for a triangle defined by vertices x, y, z.
    
    Parameters:
        x, y, z (list or tuple): Coordinates of the three vertices.
    
    Returns:
        list: Normal vector [nx, ny, nz].
    """
    v1 = [y[0] - x[0], y[1] - x[1], y[2] - x[2]]
    v2 = [z[0] - y[0], z[1] - y[1], z[2] - y[2]]
    
    n = [
        v1[1]*v2[2] - v1[2]*v2[1],
        v1[2]*v2[0] - v1[0]*v2[2],
        v1[0]*v2[1] - v1[1]*v2[0]
    ]
    
    length = sqrt(n[0]**2 + n[1]**2 + n[2]**2)
    if length == 0:
        return [0.0, 0.0, 0.0]
    n = [component / length for component in n]
    
    return n

# --------------------------------------------------------------------
# Time selection class

class tselect:
    """
    Time selection class for selecting timesteps in mdump.
    """
    
    def __init__(self, data):
        self.data = data
    
    # --------------------------------------------------------------------
    
    def all(self):
        """
        Select all timesteps.
        """
        data = self.data
        for snap in data.snaps:
            snap.tselect = 1
        data.nselect = len(data.snaps)
        data.eselect.all()
        print(f"{data.nselect} snapshots selected out of {data.nsnaps}")
    
    # --------------------------------------------------------------------
    
    def one(self, n):
        """
        Select only timestep N.
        
        Parameters:
            n (int): Timestep to select.
        """
        data = self.data
        for snap in data.snaps:
            snap.tselect = 0
        try:
            i = data.findtime(n)
            data.snaps[i].tselect = 1
            data.nselect = 1
            data.eselect.all()
            print(f"{data.nselect} snapshots selected out of {data.nsnaps}")
        except Exception as e:
            print(e)
    
    # --------------------------------------------------------------------
    
    def none(self):
        """
        Deselect all timesteps.
        """
        data = self.data
        for snap in data.snaps:
            snap.tselect = 0
        data.nselect = 0
        print(f"{data.nselect} snapshots selected out of {data.nsnaps}")
    
    # --------------------------------------------------------------------
    
    def skip(self, M):
        """
        Select every Mth step.
        
        Parameters:
            M (int): Interval for selecting timesteps.
        """
        data = self.data
        count = M - 1
        for snap in data.snaps:
            if not snap.tselect:
                continue
            count += 1
            if count == M:
                count = 0
                continue
            snap.tselect = 0
            data.nselect -= 1
        data.eselect.all()
        print(f"{data.nselect} snapshots selected out of {data.nsnaps}")
    
    # --------------------------------------------------------------------
    
    def test(self, teststr):
        """
        Select timesteps based on a Python Boolean expression.
        
        Parameters:
            teststr (str): Boolean expression using $t for timestep value.
                           Example: "$t >= 100 and $t < 10000"
        """
        data = self.data
        snaps = data.snaps
        # Replace $t with snaps[i].time and compile the command
        teststr_replaced = teststr.replace("$t", "snap.time")
        cmd = f"flag = {teststr_replaced}"
        try:
            ccmd = compile(cmd, '', 'single')
        except Exception as e:
            print(f"Error compiling test string: {e}")
            return
        for snap in snaps:
            if not snap.tselect:
                continue
            try:
                exec(ccmd, {"snap": snap})
                if not flag:
                    snap.tselect = 0
                    data.nselect -= 1
            except Exception as e:
                print(f"Error executing test string on timestep {snap.time}: {e}")
        data.eselect.all()
        print(f"{data.nselect} snapshots selected out of {data.nsnaps}")

# --------------------------------------------------------------------
# Element selection class

class eselect:
    """
    Element selection class for selecting elements within timesteps in mdump.
    """
    
    def __init__(self, data):
        self.data = data
    
    # --------------------------------------------------------------------
    
    def all(self, *args):
        """
        Select all elements in all steps or in one specified step.
        
        Parameters:
            *args: If provided, selects all elements in the specified timestep.
                   Example: all(N) selects all elements in timestep N.
        """
        data = self.data
        if len(args) == 0:
            # Select all elements in all selected timesteps
            for snap in data.snaps:
                if not snap.tselect:
                    continue
                if snap.eflag:
                    snap.eselect[:] = 1
                    snap.nselect = snap.nelements
        else:
            # Select all elements in one specified timestep
            try:
                n = args[0]
                i = data.findtime(n)
                snap = data.snaps[i]
                if snap.eflag:
                    snap.eselect[:] = 1
                    snap.nselect = snap.nelements
            except Exception as e:
                print(e)
        print(f"Elements selected. Total selected elements: {data.eselect.count_selected()}")
    
    # --------------------------------------------------------------------
    
    def test(self, teststr, *args):
        """
        Select elements based on a Python Boolean expression.
        
        Parameters:
            teststr (str): Boolean expression using $ for element attributes.
                           Example: "$id > 100 and $type == 2"
            *args: If provided, applies the test to elements in the specified timestep.
        """
        data = self.data
        snaps = data.snaps
        # Replace $var with snap.elements[i][var_index]
        pattern = r"\$\w+"
        matches = re.findall(pattern, teststr)
        for match in matches:
            name = match[1:]
            if name not in data.names:
                print(f"Unknown column name: {name}")
                return
            column = data.names[name]
            teststr = teststr.replace(match, f"snap.elements[i][{column}]")
        cmd = f"flag = {teststr}"
        try:
            ccmd = compile(cmd, '', 'single')
        except Exception as e:
            print(f"Error compiling test string: {e}")
            return
        
        if len(args) == 0:
            # Apply test to all selected timesteps
            for snap in snaps:
                if not snap.tselect:
                    continue
                for i in range(snap.nelements):
                    if not snap.eselect[i]:
                        continue
                    try:
                        exec(ccmd, {"snap": snap, "i": i})
                        if not flag:
                            snap.eselect[i] = 0
                            snap.nselect -= 1
                    except Exception as e:
                        print(f"Error executing test string on element {i} in timestep {snap.time}: {e}")
            print(f"Elements selected. Total selected elements: {data.eselect.count_selected()}")
        else:
            # Apply test to a specific timestep
            try:
                n = args[0]
                i = data.findtime(n)
                snap = snaps[i]
                if not snap.tselect:
                    print(f"Timestep {n} is not selected.")
                    return
                for j in range(snap.nelements):
                    if not snap.eselect[j]:
                        continue
                    try:
                        exec(ccmd, {"snap": snap, "i": j})
                        if not flag:
                            snap.eselect[j] = 0
                            snap.nselect -= 1
                    except Exception as e:
                        print(f"Error executing test string on element {j} in timestep {snap.time}: {e}")
                print(f"Elements selected. Total selected elements in timestep {n}: {snap.nselect}")
            except Exception as e:
                print(e)

# --------------------------------------------------------------------
# ldump Class Definition

class mdump:
    """
    mdump class for reading, writing, and manipulating mesh dump files.
    """
    
    # --------------------------------------------------------------------
    
    def __init__(self, *args):
        """
        Initialize the mdump object.
        
        Parameters:
            *args: Variable length argument list.
                   - If one argument, it's the list of files to read.
                   - If two arguments, the second argument indicates incremental reading.
        """
        self.snaps = []
        self.nsnaps = 0
        self.nselect = 0
        self.names = {}
        self.tselect = tselect(self)
        self.eselect = eselect(self)
        self.etype = ""
        
        # flist = list of all dump file names
        if len(args) == 0:
            raise Exception("No dump files specified for mdump.")
        
        words = args[0].split()
        self.flist = []
        for word in words:
            self.flist += glob.glob(word)
        
        if len(self.flist) == 0 and len(args) == 1:
            raise Exception("No dump file specified.")
        
        if len(args) == 1:
            self.increment = 0
            self.read_all()
        else:
            self.increment = 1
            self.nextfile = 0
            self.eof = 0
    
    # --------------------------------------------------------------------
    
    def read_all(self):
        """
        Read all snapshots from the list of dump files.
        """
        for file in self.flist:
            # Test for gzipped file
            if file.endswith(".gz"):
                try:
                    f = popen(f"{PIZZA_GUNZIP} -c {file}", 'r')
                except Exception as e:
                    print(f"Error opening gzipped file {file}: {e}")
                    continue
            else:
                try:
                    f = open(file, 'r')
                except Exception as e:
                    print(f"Error opening file {file}: {e}")
                    continue
    
            # Read all snapshots in the current file
            while True:
                snap = self.read_snapshot(f)
                if not snap:
                    break
                self.snaps.append(snap)
                print(snap.time, end=' ')
                sys.stdout.flush()
    
            f.close()
        print()
    
        # Sort entries by timestep and remove duplicates
        self.snaps.sort(key=lambda x: x.time)
        self.cull()
    
        # Sort all node, element, nvalue, evalue arrays by ID
        for snap in self.snaps:
            if snap.nflag and snap.nodes is not None:
                array = snap.nodes
                ids = array[:, 0]
                ordering = np.argsort(ids)
                for i in range(array.shape[1]):
                    array[:, i] = np.take(array[:, i], ordering)
            if snap.eflag and snap.elements is not None:
                array = snap.elements
                ids = array[:, 0]
                ordering = np.argsort(ids)
                for i in range(array.shape[1]):
                    array[:, i] = np.take(array[:, i], ordering)
            if snap.nvalueflag and snap.nvalues is not None:
                array = snap.nvalues
                ids = array[:, 0]
                ordering = np.argsort(ids)
                for i in range(array.shape[1]):
                    array[:, i] = np.take(array[:, i], ordering)
            if snap.evalueflag and snap.evalues is not None:
                array = snap.evalues
                ids = array[:, 0]
                ordering = np.argsort(ids)
                for i in range(array.shape[1]):
                    array[:, i] = np.take(array[:, i], ordering)
    
        # Reference definitions of nodes and elements in previous timesteps
        self.reference()
        
        self.nsnaps = len(self.snaps)
        print(f"read {self.nsnaps} snapshots")
    
        # Select all timesteps and elements
        self.tselect.all()
    
    # --------------------------------------------------------------------
    # Read the next snapshot from the list of files (incremental reading)
    
    def next(self):
        """
        Read the next snapshot in incremental mode.
        
        Returns:
            int: Time stamp of the snapshot read, or -1 if no snapshots left.
        """
        if not self.increment:
            raise Exception("Cannot read incrementally with current mdump configuration.")
    
        # Read next snapshot in current file using eof as pointer
        while self.nextfile < len(self.flist):
            file = self.flist[self.nextfile]
            # Open the current file
            if file.endswith(".gz"):
                try:
                    f = popen(f"{PIZZA_GUNZIP} -c {file}", 'r')
                except Exception as e:
                    print(f"Error opening gzipped file {file}: {e}")
                    self.nextfile += 1
                    continue
            else:
                try:
                    f = open(file, 'r')
                except Exception as e:
                    print(f"Error opening file {file}: {e}")
                    self.nextfile += 1
                    continue
    
            # Seek to the last read position
            f.seek(self.eof)
            snap = self.read_snapshot(f)
            if not snap:
                # End of file reached
                f.close()
                self.nextfile += 1
                self.eof = 0
                continue
            self.eof = f.tell()
            f.close()
                
            # Check for duplicate time stamp
            if any(existing_snap.time == snap.time for existing_snap in self.snaps):
                continue  # Skip duplicate
            self.snaps.append(snap)
            snap.tselect = 1
            snap.nselect = snap.nelements
            if snap.eflag:
                snap.eselect = np.ones(snap.nelements, dtype=int)
            self.nsnaps += 1
            self.nselect += 1
            return snap.time
    
        return -1  # No snapshots left
    
    # --------------------------------------------------------------------
    # Read a single snapshot from a file
    
    def read_snapshot(self, f):
        """
        Read a single snapshot from the file.
        
        Parameters:
            f (file object): Opened file object to read from.
        
        Returns:
            Snap: Snapshot object, or None if failed.
        """
        try:
            snap = Snap()
            # Read and discard the first line (usually a comment or header)
            item = f.readline()
            if not item:
                return None
            # Read time stamp
            line = f.readline()
            if not line:
                return None
            snap.time = int(line.strip())
            # Read snapshot type
            line = f.readline()
            if not line:
                return None
            if "NUMBER OF NODES" in line:
                snap.nflag = 1
            elif "NUMBER OF TRIANGLES" in line:
                snap.eflag = 1
            elif "NUMBER OF TETS" in line:
                snap.eflag = 2
            elif "NUMBER OF SQUARES" in line:
                snap.eflag = 3
            elif "NUMBER OF CUBES" in line:
                snap.eflag = 4
            elif "NUMBER OF NODE VALUES" in line:
                snap.nvalueflag = 1
            elif "NUMBER OF ELEMENT VALUES" in line:
                snap.evalueflag = 1
            else:
                raise Exception("Unrecognized snapshot type in dump file.")
            # Read number of nodes/elements/values
            line = f.readline()
            if not line:
                return None
            n = int(line.strip())
    
            if snap.eflag:
                snap.eselect = np.zeros(n, dtype=int)
    
            if snap.nflag:
                # Read box bounds
                f.readline()  # Read past header
                words = f.readline().split()
                snap.xlo, snap.xhi = float(words[0]), float(words[1])
                words = f.readline().split()
                snap.ylo, snap.yhi = float(words[0]), float(words[1])
                words = f.readline().split()
                snap.zlo, snap.zhi = float(words[0]), float(words[1])
                
            f.readline()  # Read past another header
    
            if n:
                # Read node/element/value data
                words = f.readline().split()
                ncol = len(words)
                data = []
                data.append([float(value) for value in words])
                for _ in range(1, n):
                    words = f.readline().split()
                    if not words:
                        break
                    data.append([float(value) for value in words])
                values = np.array(data, dtype=float)
            else:
                values = None
    
            if snap.nflag:
                snap.nodes = values
                snap.nnodes = n
            elif snap.eflag:
                snap.elements = values
                snap.nelements = n
                snap.eselect = np.zeros(n, dtype=int)  # Initialize element selection
            elif snap.nvalueflag:
                snap.nvalues = values
                snap.nnvalues = n
            elif snap.evalueflag:
                snap.evalues = values
                snap.nevalues = n
            return snap
        except Exception as e:
            print(f"Error reading snapshot: {e}")
            return None
    
    # --------------------------------------------------------------------
    # Map atom column names
    
    def map(self, *pairs):
        """
        Assign names to element value columns.
        
        Parameters:
            *pairs: Pairs of (column_number, "column_name")
                   Example: map(2, "temperature")
        """
        if len(pairs) % 2 != 0:
            raise Exception("mdump map() requires pairs of mappings.")
        for i in range(0, len(pairs), 2):
            col_num = pairs[i]
            col_name = pairs[i + 1]
            self.names[col_name] = col_num - 1  # Zero-based indexing
    
    # --------------------------------------------------------------------
    # Delete unselected snapshots
    
    def delete(self):
        """
        Delete non-selected snapshots.
        """
        ndel = 0
        selected_snaps = []
        for snap in self.snaps:
            if snap.tselect:
                selected_snaps.append(snap)
            else:
                ndel += 1
        self.snaps = selected_snaps
        self.nsnaps = len(self.snaps)
        self.nselect = sum(snap.tselect for snap in self.snaps)
        print(f"{ndel} snapshots deleted")
        print(f"{self.nsnaps} snapshots remaining")
    
    # --------------------------------------------------------------------
    # Sort snapshots by time stamp
    
    def sort(self, *args):
        """
        Sort snapshots by ID in all selected timesteps or one specified timestep.
        
        Parameters:
            *args: If provided, sorts elements in the specified timestep.
        """
        if len(args) == 0:
            print("Sorting selected snapshots ...")
            if "id" not in self.names:
                print("Column 'id' not mapped. Use m.map() to assign column names.")
                return
            id_col = self.names["id"]
            for snap in self.snaps:
                if snap.tselect:
                    self.sort_one(snap, id_col)
        else:
            try:
                n = args[0]
                i = self.findtime(n)
                if "id" not in self.names:
                    print("Column 'id' not mapped. Use m.map() to assign column names.")
                    return
                id_col = self.names["id"]
                self.sort_one(self.snaps[i], id_col)
            except Exception as e:
                print(e)
    
    # --------------------------------------------------------------------
    # Sort a single snapshot by ID column
    
    def sort_one(self, snap, id_col):
        """
        Sort elements in a single snapshot by ID column.
        
        Parameters:
            snap (Snap): The snapshot to sort.
            id_col (int): The column index for 'id'.
        """
        if snap.elements is not None:
            array = snap.elements
            ids = array[:, id_col]
            ordering = np.argsort(ids)
            for i in range(array.shape[1]):
                array[:, i] = np.take(array[:, i], ordering)
    
    # --------------------------------------------------------------------
    # Return list of selected snapshot time stamps
    
    def time(self):
        """
        Get a list of all selected snapshot time stamps.
        
        Returns:
            list: List of time stamps.
        """
        return [snap.time for snap in self.snaps if snap.tselect]
    
    # --------------------------------------------------------------------
    # Extract vector(s) of values for selected elements at chosen timestep
    
    def vecs(self, n, *columns):
        """
        Extract vector(s) of values for selected elements at a specific timestep.
        
        Parameters:
            n (int): Timestep to extract vectors from.
            *columns (str): Column names to extract.
        
        Returns:
            list: List of vectors corresponding to the specified columns.
        """
        try:
            snap = self.snaps[self.findtime(n)]
        except Exception as e:
            print(e)
            return []
        if not snap.evalueflag:
            raise Exception("Snapshot has no element values.")
        
        if len(columns) == 0:
            raise Exception("No columns specified.")
        
        column_indices = []
        for name in columns:
            if name not in self.names:
                print(f"Unknown column name: {name}")
                return []
            column_indices.append(self.names[name])
        
        values = []
        for col in column_indices:
            vec = []
            for i in range(snap.nelements):
                if snap.eselect[i]:
                    vec.append(snap.evalues[i][col])
            values.append(vec)
        
        if len(values) == 1:
            return values[0]
        else:
            return values
    
    # --------------------------------------------------------------------
    # Sort snapshots on time stamp
    # (Replaced by sort method using key=lambda x: x.time)
    
    def compare_time(self, a, b):
        """
        Comparator for sorting snapshots by time.
        
        Parameters:
            a (Snap): First snapshot.
            b (Snap): Second snapshot.
        
        Returns:
            int: -1 if a < b, 1 if a > b, 0 if equal.
        """
        if a.time < b.time:
            return -1
        elif a.time > b.time:
            return 1
        else:
            return 0
    
    # --------------------------------------------------------------------
    # Delete successive snapshots with duplicate time stamp
    # If have same timestamp, combine them if internal flags are different
    
    def cull(self):
        """
        Remove duplicate snapshots based on time stamps and merge their data.
        """
        i = 1
        while i < len(self.snaps):
            if self.snaps[i].time == self.snaps[i-1].time:
                if self.snaps[i].nflag and not self.snaps[i-1].nflag:
                    self.snaps[i-1].nflag = 1
                    self.snaps[i-1].nnodes = self.snaps[i].nnodes
                    self.snaps[i-1].nodes = self.snaps[i].nodes
                    self.snaps[i-1].xlo = self.snaps[i].xlo
                    self.snaps[i-1].xhi = self.snaps[i].xhi
                    self.snaps[i-1].ylo = self.snaps[i].ylo
                    self.snaps[i-1].yhi = self.snaps[i].yhi
                    self.snaps[i-1].zlo = self.snaps[i].zlo
                    self.snaps[i-1].zhi = self.snaps[i].zhi
                elif self.snaps[i].eflag and not self.snaps[i-1].eflag:
                    self.snaps[i-1].eflag = self.snaps[i].eflag
                    self.snaps[i-1].nelements = self.snaps[i].nelements
                    self.snaps[i-1].elements = self.snaps[i].elements
                    self.snaps[i-1].eselect = self.snaps[i].eselect
                elif self.snaps[i].nvalueflag and not self.snaps[i-1].nvalueflag:
                    self.snaps[i-1].nvalueflag = 1
                    self.snaps[i-1].nnvalues = self.snaps[i].nnvalues
                    self.snaps[i-1].nvalues = self.snaps[i].nvalues
                elif self.snaps[i].evalueflag and not self.snaps[i-1].evalueflag:
                    self.snaps[i-1].evalueflag = 1
                    self.snaps[i-1].nevalues = self.snaps[i].nevalues
                    self.snaps[i-1].evalues = self.snaps[i].evalues
                del self.snaps[i]
            else:
                i += 1
    
    # --------------------------------------------------------------------
    # Ensure every snapshot has node and element connectivity info
    # If not, point it at the most recent snapshot that does
    
    def reference(self):
        """
        Ensure every snapshot has node and element connectivity information by referencing previous snapshots.
        """
        for i in range(len(self.snaps)):
            if not self.snaps[i].nflag:
                for j in range(i, -1, -1):
                    if self.snaps[j].nflag:
                        self.snaps[i].nflag = self.snaps[j].nflag
                        self.snaps[i].nnodes = self.snaps[j].nnodes
                        self.snaps[i].nodes = self.snaps[j].nodes
                        self.snaps[i].xlo = self.snaps[j].xlo
                        self.snaps[i].xhi = self.snaps[j].xhi
                        self.snaps[i].ylo = self.snaps[j].ylo
                        self.snaps[i].yhi = self.snaps[j].yhi
                        self.snaps[i].zlo = self.snaps[j].zlo
                        self.snaps[i].zhi = self.snaps[j].zhi
                        break
            if not self.snaps[i].nflag:
                raise Exception("No nodal coordinates found in previous snapshots.")
            if not self.snaps[i].eflag:
                for j in range(i, -1, -1):
                    if self.snaps[j].eflag:
                        self.snaps[i].eflag = self.snaps[j].eflag
                        self.snaps[i].nelements = self.snaps[j].nelements
                        self.snaps[i].elements = self.snaps[j].elements
                        self.snaps[i].eselect = self.snaps[j].eselect
                        break
            if not self.snaps[i].eflag:
                raise Exception("No element connections found in previous snapshots.")
    
    # --------------------------------------------------------------------
    # Iterate over selected snapshots
    
    def iterator(self, flag):
        """
        Iterator method compatible with equivalent dump calls.
        
        Parameters:
            flag (int): 0 for first call, 1 for subsequent calls.
        
        Returns:
            tuple: (index, time, flag)
                   index (int): Index within dump object (0 to # of snapshots)
                   time (int): Timestep value
                   flag (int): -1 when iteration is done, 1 otherwise
        """
        if flag == 0:
            self.iterate = -1  # Initialize iteration
        self.iterate += 1
        if self.iterate >= self.nsnaps:
            return (0, 0, -1)
        snap = self.snaps[self.iterate]
        if not snap.tselect:
            return self.iterator(1)
        return (self.iterate, snap.time, 1)
    
    # --------------------------------------------------------------------
    # Return list of viz objects for a snapshot
    
    def viz(self, index, flag=0):
        """
        Return mesh visualization information for a snapshot.
        
        Parameters:
            index (int): Snapshot index or time stamp.
            flag (int): If 1, interpret index as time stamp.
        
        Returns:
            tuple: (time, box, atoms, bonds, tris, lines)
                   atoms, bonds, lines are None.
                   tris contain [id, type, x1, y1, z1, x2, y2, z2, x3, y3, z3, nx, ny, nz] for each triangle.
        """
        if not flag:
            isnap = index
        else:
            times = self.time()
            n = len(times)
            i = 0
            while i < n:
                if times[i] > index:
                    break
                i += 1
            isnap = i - 1
        if isnap < 0 or isnap >= self.nsnaps:
            return (-1, None, None, None, None, None)
        snap = self.snaps[isnap]
        
        time = snap.time
        box = [snap.xlo, snap.ylo, snap.zlo, snap.xhi, snap.yhi, snap.zhi]
        if self.etype == "":
            type_col = -1
        else:
            if self.etype not in self.names:
                print(f"Unknown etype column: {self.etype}")
                type_col = -1
            else:
                type_col = self.names[self.etype]
    
        atoms = None
        bonds = None
    
        tris = []
        nodes = snap.nodes
        for i in range(snap.nelements):
            if not snap.eselect[i]:
                continue
            element = snap.elements[i]
            if snap.evalueflag:
                evalue = snap.evalues[i]
            else:
                evalue = None
    
            # Process based on element type
            if snap.eflag == 1:
                # Triangle
                try:
                    v1 = nodes[int(element[2]) - 1][2:5].tolist()
                    v2 = nodes[int(element[3]) - 1][2:5].tolist()
                    v3 = nodes[int(element[4]) - 1][2:5].tolist()
                except IndexError:
                    print(f"Error accessing nodes for element {i} in timestep {time}.")
                    continue
                list_vertices = v1 + v2 + v3
                n = normal(list_vertices[0:3], list_vertices[3:6], list_vertices[6:9])
                if type_col == -1:
                    tris.append([int(element[0]), int(element[1])] + list_vertices + n)
                else:
                    tris.append([int(element[0]), int(evalue[type_col])] + list_vertices + n)
    
            elif snap.eflag == 2:
                # Tetrahedron - convert to 4 triangles
                try:
                    v1 = nodes[int(element[2]) - 1][2:5].tolist()
                    v2 = nodes[int(element[3]) - 1][2:5].tolist()
                    v3 = nodes[int(element[4]) - 1][2:5].tolist()
                    v4 = nodes[int(element[5]) - 1][2:5].tolist()
                except IndexError:
                    print(f"Error accessing nodes for element {i} in timestep {time}.")
                    continue
                tris_data = [
                    v1 + v2 + v4,
                    v2 + v3 + v4,
                    v1 + v4 + v3,
                    v1 + v3 + v2
                ]
                for tri_vertices in tris_data:
                    n = normal(tri_vertices[0:3], tri_vertices[3:6], tri_vertices[6:9])
                    if type_col == -1:
                        tris.append([int(element[0]), int(element[1])] + tri_vertices + n)
                    else:
                        tris.append([int(element[0]), int(evalue[type_col])] + tri_vertices + n)
    
            elif snap.eflag == 3:
                # Square - convert to 2 triangles
                try:
                    v1 = nodes[int(element[2]) - 1][2:5].tolist()
                    v2 = nodes[int(element[3]) - 1][2:5].tolist()
                    v3 = nodes[int(element[4]) - 1][2:5].tolist()
                    v4 = nodes[int(element[5]) - 1][2:5].tolist()
                except IndexError:
                    print(f"Error accessing nodes for element {i} in timestep {time}.")
                    continue
                tris_data = [
                    v1 + v2 + v3,
                    v1 + v3 + v4
                ]
                for tri_vertices in tris_data:
                    n = normal(tri_vertices[0:3], tri_vertices[3:6], tri_vertices[6:9])
                    if type_col == -1:
                        tris.append([int(element[0]), int(element[1])] + tri_vertices + n)
                    else:
                        tris.append([int(element[0]), int(evalue[type_col])] + tri_vertices + n)
    
            elif snap.eflag == 4:
                # Cube - convert to 12 triangles
                try:
                    v1 = nodes[int(element[2]) - 1][2:5].tolist()
                    v2 = nodes[int(element[3]) - 1][2:5].tolist()
                    v3 = nodes[int(element[4]) - 1][2:5].tolist()
                    v4 = nodes[int(element[5]) - 1][2:5].tolist()
                    v5 = nodes[int(element[6]) - 1][2:5].tolist()
                    v6 = nodes[int(element[7]) - 1][2:5].tolist()
                    v7 = nodes[int(element[8]) - 1][2:5].tolist()
                    v8 = nodes[int(element[9]) - 1][2:5].tolist()
                except IndexError:
                    print(f"Error accessing nodes for element {i} in timestep {time}.")
                    continue
                tris_data = [
                    # Lower z face
                    v1 + v3 + v2,
                    v1 + v4 + v3,
                    # Upper z face
                    v5 + v6 + v7,
                    v5 + v7 + v8,
                    # Lower y face
                    v1 + v2 + v6,
                    v1 + v6 + v5,
                    # Upper y face
                    v4 + v7 + v3,
                    v4 + v8 + v7,
                    # Lower x face
                    v1 + v8 + v4,
                    v1 + v5 + v8,
                    # Upper x face
                    v2 + v3 + v7,
                    v2 + v7 + v6
                ]
                for tri_vertices in tris_data:
                    n = normal(tri_vertices[0:3], tri_vertices[3:6], tri_vertices[6:9])
                    if type_col == -1:
                        tris.append([int(element[0]), int(element[1])] + tri_vertices + n)
                    else:
                        tris.append([int(element[0]), int(evalue[type_col])] + tri_vertices + n)
        
        # Lines are not used in viz for mesh dumps
        lines = []
    
        return (time, box, atoms, bonds, tris, lines)
    
    # --------------------------------------------------------------------
    # Return lists of node/element info for snapshot isnap
    # If called with flag, then index is timestep, so convert to snapshot index
    
    def mviz(self, index, flag=0):
        """
        Return mesh visualization information for a snapshot, including nodes and elements.
        
        Parameters:
            index (int): Snapshot index or time stamp.
            flag (int): If 1, interpret index as time stamp.
        
        Returns:
            tuple: (time, box, nodes, elements, nvalues, evalues)
        """
        if not flag:
            isnap = index
        else:
            times = self.time()
            n = len(times)
            i = 0
            while i < n:
                if times[i] > index:
                    break
                i += 1
            isnap = i - 1
        if isnap < 0 or isnap >= self.nsnaps:
            return (-1, None, None, None, None, None)
        snap = self.snaps[isnap]
        
        time = snap.time
        box = [snap.xlo, snap.ylo, snap.zlo, snap.xhi, snap.yhi, snap.zhi]
        nodes = []
        elements = []
        nvalues = []
        evalues = []
        
        if snap.nflag and snap.nodes is not None:
            for node in snap.nodes:
                nodes.append([int(node[0]), int(node[1]), node[2], node[3], node[4]])
        if snap.eflag and snap.elements is not None:
            for elem in snap.elements:
                elements.append([int(elem[0]), int(elem[1])] + elem[2:].tolist())
        if snap.nvalueflag and snap.nvalues is not None:
            for nval in snap.nvalues:
                nvalues.append([int(nval[0]), int(nval[1])] + nval[2:].tolist())
        if snap.evalueflag and snap.evalues is not None:
            for eval in snap.evalues:
                evalues.append([int(eval[0]), int(eval[1])] + eval[2:].tolist())
        
        return (time, box, nodes, elements, nvalues, evalues)
    
    # --------------------------------------------------------------------
    # Wrap line end points associated with atoms through periodic boundaries
    # Invoked by dump() when it does an owrap() on its atoms
    
    def owrap(self, time, xprd, yprd, zprd, idsdump, atomsdump, iother, ix, iy, iz):
        """
        Wrap line end points associated with atoms through periodic boundaries.
        
        Parameters:
            time (int): Time stamp of the snapshot.
            xprd, yprd, zprd (float): Periodicity in x, y, z directions.
            idsdump (dict): Dictionary mapping atom IDs to dump indices.
            atomsdump (list): List of atoms from the dump.
            iother (int): Other index for wrapping.
            ix, iy, iz (int): Column indices for x, y, z coordinates.
        """
        try:
            id_col = self.names["id"]
            end1x_col = self.names["end1x"]
            end1y_col = self.names["end1y"]
            end2x_col = self.names["end2x"]
            end2y_col = self.names["end2y"]
        except KeyError as e:
            print(f"Column name {e} not mapped. Use m.map() to assign column names.")
            return
    
        try:
            isnap = self.findtime(time)
        except Exception as e:
            print(e)
            return
        snap = self.snaps[isnap]
        atoms = snap.atoms
    
        # idump = index of my line I in dump's atoms
        # jdump = atom J in dump's atoms that atom I was owrapped on
        # delx, dely = offset applied to atom I and thus to line I
    
        for i in range(snap.nelements):
            try:
                tag = int(atoms[i][id_col])
                idump = idsdump[tag]
                jdump = idsdump[int(atomsdump[idump][iother])]
                delx = (atomsdump[idump][ix] - atomsdump[jdump][ix]) * xprd
                dely = (atomsdump[idump][iy] - atomsdump[jdump][iy]) * yprd
                delz = (atomsdump[idump][iz] - atomsdump[jdump][iz]) * zprd
                # Assuming z-coordinates need to be handled if present
                # If not, the z-component remains unchanged
                # Here, only x and y are being wrapped
                atoms[i][end1x_col] += delx
                atoms[i][end1y_col] += dely
                atoms[i][end2x_col] += delx
                atoms[i][end2y_col] += dely
            except (KeyError, IndexError, ValueError) as e:
                print(f"Error in owrap: {e}")
                continue

# --------------------------------------------------------------------
# Example Usage (for testing purposes)
# Uncomment the following lines to test the mdump3.py module

if __name__ == "__main__":
    m = mdump("mesh.one")
    m.map(2, "temperature")
    m.tselect.all()
    while True:
        time = m.next()
        if time == -1:
            break
        print(f"Snapshot Time: {time}")
        _, box, nodes, elements, nvalues, evalues = m.viz(time, 1)
        print(f"Nodes: {nodes}")
        print(f"Elements: {elements}")

Functions

def normal(x, y, z)

Compute the normal vector for a triangle defined by vertices x, y, z.

Parameters

x, y, z (list or tuple): Coordinates of the three vertices.

Returns

list
Normal vector [nx, ny, nz].
Expand source code
def normal(x, y, z):
    """
    Compute the normal vector for a triangle defined by vertices x, y, z.
    
    Parameters:
        x, y, z (list or tuple): Coordinates of the three vertices.
    
    Returns:
        list: Normal vector [nx, ny, nz].
    """
    v1 = [y[0] - x[0], y[1] - x[1], y[2] - x[2]]
    v2 = [z[0] - y[0], z[1] - y[1], z[2] - y[2]]
    
    n = [
        v1[1]*v2[2] - v1[2]*v2[1],
        v1[2]*v2[0] - v1[0]*v2[2],
        v1[0]*v2[1] - v1[1]*v2[0]
    ]
    
    length = sqrt(n[0]**2 + n[1]**2 + n[2]**2)
    if length == 0:
        return [0.0, 0.0, 0.0]
    n = [component / length for component in n]
    
    return n

Classes

class Snap

Represents a single snapshot from a mesh dump file.

Attributes

time : int
Time stamp of the snapshot.
tselect : int
0 or 1 indicating if this snapshot is selected.
nselect : int
Number of selected elements in this snapshot.
nflag : int
0 or 1 indicating if this snapshot has nodal coordinates.
eflag : int
0 or 1-4 indicating the type of elements (tri, tet, square, cube).
nvalueflag : int
0 or 1 indicating if this snapshot has nodal values.
evalueflag : int
0 or 1 indicating if this snapshot has element values.
xlo, xhi, ylo, yhi, zlo, zhi (float): Box bounds.
nodes : numpy.ndarray
Array of node data.
elements : numpy.ndarray
Array of element data.
nvalues : numpy.ndarray
Array of node values.
evalues : numpy.ndarray
Array of element values.
eselect : numpy.ndarray
Array indicating selected elements.
Expand source code
class Snap:
    """
    Represents a single snapshot from a mesh dump file.
    
    Attributes:
        time (int): Time stamp of the snapshot.
        tselect (int): 0 or 1 indicating if this snapshot is selected.
        nselect (int): Number of selected elements in this snapshot.
        nflag (int): 0 or 1 indicating if this snapshot has nodal coordinates.
        eflag (int): 0 or 1-4 indicating the type of elements (tri, tet, square, cube).
        nvalueflag (int): 0 or 1 indicating if this snapshot has nodal values.
        evalueflag (int): 0 or 1 indicating if this snapshot has element values.
        xlo, xhi, ylo, yhi, zlo, zhi (float): Box bounds.
        nodes (numpy.ndarray): Array of node data.
        elements (numpy.ndarray): Array of element data.
        nvalues (numpy.ndarray): Array of node values.
        evalues (numpy.ndarray): Array of element values.
        eselect (numpy.ndarray): Array indicating selected elements.
    """
    def __init__(self):
        self.time = 0
        self.tselect = 0
        self.nselect = 0
        self.nflag = 0
        self.eflag = 0
        self.nvalueflag = 0
        self.evalueflag = 0
        self.xlo = self.xhi = self.ylo = self.yhi = self.zlo = self.zhi = 0.0
        self.nodes = None
        self.elements = None
        self.nvalues = None
        self.evalues = None
        self.eselect = None
class eselect (data)

Element selection class for selecting elements within timesteps in mdump.

Expand source code
class eselect:
    """
    Element selection class for selecting elements within timesteps in mdump.
    """
    
    def __init__(self, data):
        self.data = data
    
    # --------------------------------------------------------------------
    
    def all(self, *args):
        """
        Select all elements in all steps or in one specified step.
        
        Parameters:
            *args: If provided, selects all elements in the specified timestep.
                   Example: all(N) selects all elements in timestep N.
        """
        data = self.data
        if len(args) == 0:
            # Select all elements in all selected timesteps
            for snap in data.snaps:
                if not snap.tselect:
                    continue
                if snap.eflag:
                    snap.eselect[:] = 1
                    snap.nselect = snap.nelements
        else:
            # Select all elements in one specified timestep
            try:
                n = args[0]
                i = data.findtime(n)
                snap = data.snaps[i]
                if snap.eflag:
                    snap.eselect[:] = 1
                    snap.nselect = snap.nelements
            except Exception as e:
                print(e)
        print(f"Elements selected. Total selected elements: {data.eselect.count_selected()}")
    
    # --------------------------------------------------------------------
    
    def test(self, teststr, *args):
        """
        Select elements based on a Python Boolean expression.
        
        Parameters:
            teststr (str): Boolean expression using $ for element attributes.
                           Example: "$id > 100 and $type == 2"
            *args: If provided, applies the test to elements in the specified timestep.
        """
        data = self.data
        snaps = data.snaps
        # Replace $var with snap.elements[i][var_index]
        pattern = r"\$\w+"
        matches = re.findall(pattern, teststr)
        for match in matches:
            name = match[1:]
            if name not in data.names:
                print(f"Unknown column name: {name}")
                return
            column = data.names[name]
            teststr = teststr.replace(match, f"snap.elements[i][{column}]")
        cmd = f"flag = {teststr}"
        try:
            ccmd = compile(cmd, '', 'single')
        except Exception as e:
            print(f"Error compiling test string: {e}")
            return
        
        if len(args) == 0:
            # Apply test to all selected timesteps
            for snap in snaps:
                if not snap.tselect:
                    continue
                for i in range(snap.nelements):
                    if not snap.eselect[i]:
                        continue
                    try:
                        exec(ccmd, {"snap": snap, "i": i})
                        if not flag:
                            snap.eselect[i] = 0
                            snap.nselect -= 1
                    except Exception as e:
                        print(f"Error executing test string on element {i} in timestep {snap.time}: {e}")
            print(f"Elements selected. Total selected elements: {data.eselect.count_selected()}")
        else:
            # Apply test to a specific timestep
            try:
                n = args[0]
                i = data.findtime(n)
                snap = snaps[i]
                if not snap.tselect:
                    print(f"Timestep {n} is not selected.")
                    return
                for j in range(snap.nelements):
                    if not snap.eselect[j]:
                        continue
                    try:
                        exec(ccmd, {"snap": snap, "i": j})
                        if not flag:
                            snap.eselect[j] = 0
                            snap.nselect -= 1
                    except Exception as e:
                        print(f"Error executing test string on element {j} in timestep {snap.time}: {e}")
                print(f"Elements selected. Total selected elements in timestep {n}: {snap.nselect}")
            except Exception as e:
                print(e)

Methods

def all(self, *args)

Select all elements in all steps or in one specified step.

Parameters

*args: If provided, selects all elements in the specified timestep. Example: all(N) selects all elements in timestep N.

Expand source code
def all(self, *args):
    """
    Select all elements in all steps or in one specified step.
    
    Parameters:
        *args: If provided, selects all elements in the specified timestep.
               Example: all(N) selects all elements in timestep N.
    """
    data = self.data
    if len(args) == 0:
        # Select all elements in all selected timesteps
        for snap in data.snaps:
            if not snap.tselect:
                continue
            if snap.eflag:
                snap.eselect[:] = 1
                snap.nselect = snap.nelements
    else:
        # Select all elements in one specified timestep
        try:
            n = args[0]
            i = data.findtime(n)
            snap = data.snaps[i]
            if snap.eflag:
                snap.eselect[:] = 1
                snap.nselect = snap.nelements
        except Exception as e:
            print(e)
    print(f"Elements selected. Total selected elements: {data.eselect.count_selected()}")
def test(self, teststr, *args)

Select elements based on a Python Boolean expression.

Parameters

teststr (str): Boolean expression using $ for element attributes. Example: "$id > 100 and $type == 2" *args: If provided, applies the test to elements in the specified timestep.

Expand source code
def test(self, teststr, *args):
    """
    Select elements based on a Python Boolean expression.
    
    Parameters:
        teststr (str): Boolean expression using $ for element attributes.
                       Example: "$id > 100 and $type == 2"
        *args: If provided, applies the test to elements in the specified timestep.
    """
    data = self.data
    snaps = data.snaps
    # Replace $var with snap.elements[i][var_index]
    pattern = r"\$\w+"
    matches = re.findall(pattern, teststr)
    for match in matches:
        name = match[1:]
        if name not in data.names:
            print(f"Unknown column name: {name}")
            return
        column = data.names[name]
        teststr = teststr.replace(match, f"snap.elements[i][{column}]")
    cmd = f"flag = {teststr}"
    try:
        ccmd = compile(cmd, '', 'single')
    except Exception as e:
        print(f"Error compiling test string: {e}")
        return
    
    if len(args) == 0:
        # Apply test to all selected timesteps
        for snap in snaps:
            if not snap.tselect:
                continue
            for i in range(snap.nelements):
                if not snap.eselect[i]:
                    continue
                try:
                    exec(ccmd, {"snap": snap, "i": i})
                    if not flag:
                        snap.eselect[i] = 0
                        snap.nselect -= 1
                except Exception as e:
                    print(f"Error executing test string on element {i} in timestep {snap.time}: {e}")
        print(f"Elements selected. Total selected elements: {data.eselect.count_selected()}")
    else:
        # Apply test to a specific timestep
        try:
            n = args[0]
            i = data.findtime(n)
            snap = snaps[i]
            if not snap.tselect:
                print(f"Timestep {n} is not selected.")
                return
            for j in range(snap.nelements):
                if not snap.eselect[j]:
                    continue
                try:
                    exec(ccmd, {"snap": snap, "i": j})
                    if not flag:
                        snap.eselect[j] = 0
                        snap.nselect -= 1
                except Exception as e:
                    print(f"Error executing test string on element {j} in timestep {snap.time}: {e}")
            print(f"Elements selected. Total selected elements in timestep {n}: {snap.nselect}")
        except Exception as e:
            print(e)
class mdump (*args)

mdump class for reading, writing, and manipulating mesh dump files.

Initialize the mdump object.

Parameters

*args: Variable length argument list. - If one argument, it's the list of files to read. - If two arguments, the second argument indicates incremental reading.

Expand source code
class mdump:
    """
    mdump class for reading, writing, and manipulating mesh dump files.
    """
    
    # --------------------------------------------------------------------
    
    def __init__(self, *args):
        """
        Initialize the mdump object.
        
        Parameters:
            *args: Variable length argument list.
                   - If one argument, it's the list of files to read.
                   - If two arguments, the second argument indicates incremental reading.
        """
        self.snaps = []
        self.nsnaps = 0
        self.nselect = 0
        self.names = {}
        self.tselect = tselect(self)
        self.eselect = eselect(self)
        self.etype = ""
        
        # flist = list of all dump file names
        if len(args) == 0:
            raise Exception("No dump files specified for mdump.")
        
        words = args[0].split()
        self.flist = []
        for word in words:
            self.flist += glob.glob(word)
        
        if len(self.flist) == 0 and len(args) == 1:
            raise Exception("No dump file specified.")
        
        if len(args) == 1:
            self.increment = 0
            self.read_all()
        else:
            self.increment = 1
            self.nextfile = 0
            self.eof = 0
    
    # --------------------------------------------------------------------
    
    def read_all(self):
        """
        Read all snapshots from the list of dump files.
        """
        for file in self.flist:
            # Test for gzipped file
            if file.endswith(".gz"):
                try:
                    f = popen(f"{PIZZA_GUNZIP} -c {file}", 'r')
                except Exception as e:
                    print(f"Error opening gzipped file {file}: {e}")
                    continue
            else:
                try:
                    f = open(file, 'r')
                except Exception as e:
                    print(f"Error opening file {file}: {e}")
                    continue
    
            # Read all snapshots in the current file
            while True:
                snap = self.read_snapshot(f)
                if not snap:
                    break
                self.snaps.append(snap)
                print(snap.time, end=' ')
                sys.stdout.flush()
    
            f.close()
        print()
    
        # Sort entries by timestep and remove duplicates
        self.snaps.sort(key=lambda x: x.time)
        self.cull()
    
        # Sort all node, element, nvalue, evalue arrays by ID
        for snap in self.snaps:
            if snap.nflag and snap.nodes is not None:
                array = snap.nodes
                ids = array[:, 0]
                ordering = np.argsort(ids)
                for i in range(array.shape[1]):
                    array[:, i] = np.take(array[:, i], ordering)
            if snap.eflag and snap.elements is not None:
                array = snap.elements
                ids = array[:, 0]
                ordering = np.argsort(ids)
                for i in range(array.shape[1]):
                    array[:, i] = np.take(array[:, i], ordering)
            if snap.nvalueflag and snap.nvalues is not None:
                array = snap.nvalues
                ids = array[:, 0]
                ordering = np.argsort(ids)
                for i in range(array.shape[1]):
                    array[:, i] = np.take(array[:, i], ordering)
            if snap.evalueflag and snap.evalues is not None:
                array = snap.evalues
                ids = array[:, 0]
                ordering = np.argsort(ids)
                for i in range(array.shape[1]):
                    array[:, i] = np.take(array[:, i], ordering)
    
        # Reference definitions of nodes and elements in previous timesteps
        self.reference()
        
        self.nsnaps = len(self.snaps)
        print(f"read {self.nsnaps} snapshots")
    
        # Select all timesteps and elements
        self.tselect.all()
    
    # --------------------------------------------------------------------
    # Read the next snapshot from the list of files (incremental reading)
    
    def next(self):
        """
        Read the next snapshot in incremental mode.
        
        Returns:
            int: Time stamp of the snapshot read, or -1 if no snapshots left.
        """
        if not self.increment:
            raise Exception("Cannot read incrementally with current mdump configuration.")
    
        # Read next snapshot in current file using eof as pointer
        while self.nextfile < len(self.flist):
            file = self.flist[self.nextfile]
            # Open the current file
            if file.endswith(".gz"):
                try:
                    f = popen(f"{PIZZA_GUNZIP} -c {file}", 'r')
                except Exception as e:
                    print(f"Error opening gzipped file {file}: {e}")
                    self.nextfile += 1
                    continue
            else:
                try:
                    f = open(file, 'r')
                except Exception as e:
                    print(f"Error opening file {file}: {e}")
                    self.nextfile += 1
                    continue
    
            # Seek to the last read position
            f.seek(self.eof)
            snap = self.read_snapshot(f)
            if not snap:
                # End of file reached
                f.close()
                self.nextfile += 1
                self.eof = 0
                continue
            self.eof = f.tell()
            f.close()
                
            # Check for duplicate time stamp
            if any(existing_snap.time == snap.time for existing_snap in self.snaps):
                continue  # Skip duplicate
            self.snaps.append(snap)
            snap.tselect = 1
            snap.nselect = snap.nelements
            if snap.eflag:
                snap.eselect = np.ones(snap.nelements, dtype=int)
            self.nsnaps += 1
            self.nselect += 1
            return snap.time
    
        return -1  # No snapshots left
    
    # --------------------------------------------------------------------
    # Read a single snapshot from a file
    
    def read_snapshot(self, f):
        """
        Read a single snapshot from the file.
        
        Parameters:
            f (file object): Opened file object to read from.
        
        Returns:
            Snap: Snapshot object, or None if failed.
        """
        try:
            snap = Snap()
            # Read and discard the first line (usually a comment or header)
            item = f.readline()
            if not item:
                return None
            # Read time stamp
            line = f.readline()
            if not line:
                return None
            snap.time = int(line.strip())
            # Read snapshot type
            line = f.readline()
            if not line:
                return None
            if "NUMBER OF NODES" in line:
                snap.nflag = 1
            elif "NUMBER OF TRIANGLES" in line:
                snap.eflag = 1
            elif "NUMBER OF TETS" in line:
                snap.eflag = 2
            elif "NUMBER OF SQUARES" in line:
                snap.eflag = 3
            elif "NUMBER OF CUBES" in line:
                snap.eflag = 4
            elif "NUMBER OF NODE VALUES" in line:
                snap.nvalueflag = 1
            elif "NUMBER OF ELEMENT VALUES" in line:
                snap.evalueflag = 1
            else:
                raise Exception("Unrecognized snapshot type in dump file.")
            # Read number of nodes/elements/values
            line = f.readline()
            if not line:
                return None
            n = int(line.strip())
    
            if snap.eflag:
                snap.eselect = np.zeros(n, dtype=int)
    
            if snap.nflag:
                # Read box bounds
                f.readline()  # Read past header
                words = f.readline().split()
                snap.xlo, snap.xhi = float(words[0]), float(words[1])
                words = f.readline().split()
                snap.ylo, snap.yhi = float(words[0]), float(words[1])
                words = f.readline().split()
                snap.zlo, snap.zhi = float(words[0]), float(words[1])
                
            f.readline()  # Read past another header
    
            if n:
                # Read node/element/value data
                words = f.readline().split()
                ncol = len(words)
                data = []
                data.append([float(value) for value in words])
                for _ in range(1, n):
                    words = f.readline().split()
                    if not words:
                        break
                    data.append([float(value) for value in words])
                values = np.array(data, dtype=float)
            else:
                values = None
    
            if snap.nflag:
                snap.nodes = values
                snap.nnodes = n
            elif snap.eflag:
                snap.elements = values
                snap.nelements = n
                snap.eselect = np.zeros(n, dtype=int)  # Initialize element selection
            elif snap.nvalueflag:
                snap.nvalues = values
                snap.nnvalues = n
            elif snap.evalueflag:
                snap.evalues = values
                snap.nevalues = n
            return snap
        except Exception as e:
            print(f"Error reading snapshot: {e}")
            return None
    
    # --------------------------------------------------------------------
    # Map atom column names
    
    def map(self, *pairs):
        """
        Assign names to element value columns.
        
        Parameters:
            *pairs: Pairs of (column_number, "column_name")
                   Example: map(2, "temperature")
        """
        if len(pairs) % 2 != 0:
            raise Exception("mdump map() requires pairs of mappings.")
        for i in range(0, len(pairs), 2):
            col_num = pairs[i]
            col_name = pairs[i + 1]
            self.names[col_name] = col_num - 1  # Zero-based indexing
    
    # --------------------------------------------------------------------
    # Delete unselected snapshots
    
    def delete(self):
        """
        Delete non-selected snapshots.
        """
        ndel = 0
        selected_snaps = []
        for snap in self.snaps:
            if snap.tselect:
                selected_snaps.append(snap)
            else:
                ndel += 1
        self.snaps = selected_snaps
        self.nsnaps = len(self.snaps)
        self.nselect = sum(snap.tselect for snap in self.snaps)
        print(f"{ndel} snapshots deleted")
        print(f"{self.nsnaps} snapshots remaining")
    
    # --------------------------------------------------------------------
    # Sort snapshots by time stamp
    
    def sort(self, *args):
        """
        Sort snapshots by ID in all selected timesteps or one specified timestep.
        
        Parameters:
            *args: If provided, sorts elements in the specified timestep.
        """
        if len(args) == 0:
            print("Sorting selected snapshots ...")
            if "id" not in self.names:
                print("Column 'id' not mapped. Use m.map() to assign column names.")
                return
            id_col = self.names["id"]
            for snap in self.snaps:
                if snap.tselect:
                    self.sort_one(snap, id_col)
        else:
            try:
                n = args[0]
                i = self.findtime(n)
                if "id" not in self.names:
                    print("Column 'id' not mapped. Use m.map() to assign column names.")
                    return
                id_col = self.names["id"]
                self.sort_one(self.snaps[i], id_col)
            except Exception as e:
                print(e)
    
    # --------------------------------------------------------------------
    # Sort a single snapshot by ID column
    
    def sort_one(self, snap, id_col):
        """
        Sort elements in a single snapshot by ID column.
        
        Parameters:
            snap (Snap): The snapshot to sort.
            id_col (int): The column index for 'id'.
        """
        if snap.elements is not None:
            array = snap.elements
            ids = array[:, id_col]
            ordering = np.argsort(ids)
            for i in range(array.shape[1]):
                array[:, i] = np.take(array[:, i], ordering)
    
    # --------------------------------------------------------------------
    # Return list of selected snapshot time stamps
    
    def time(self):
        """
        Get a list of all selected snapshot time stamps.
        
        Returns:
            list: List of time stamps.
        """
        return [snap.time for snap in self.snaps if snap.tselect]
    
    # --------------------------------------------------------------------
    # Extract vector(s) of values for selected elements at chosen timestep
    
    def vecs(self, n, *columns):
        """
        Extract vector(s) of values for selected elements at a specific timestep.
        
        Parameters:
            n (int): Timestep to extract vectors from.
            *columns (str): Column names to extract.
        
        Returns:
            list: List of vectors corresponding to the specified columns.
        """
        try:
            snap = self.snaps[self.findtime(n)]
        except Exception as e:
            print(e)
            return []
        if not snap.evalueflag:
            raise Exception("Snapshot has no element values.")
        
        if len(columns) == 0:
            raise Exception("No columns specified.")
        
        column_indices = []
        for name in columns:
            if name not in self.names:
                print(f"Unknown column name: {name}")
                return []
            column_indices.append(self.names[name])
        
        values = []
        for col in column_indices:
            vec = []
            for i in range(snap.nelements):
                if snap.eselect[i]:
                    vec.append(snap.evalues[i][col])
            values.append(vec)
        
        if len(values) == 1:
            return values[0]
        else:
            return values
    
    # --------------------------------------------------------------------
    # Sort snapshots on time stamp
    # (Replaced by sort method using key=lambda x: x.time)
    
    def compare_time(self, a, b):
        """
        Comparator for sorting snapshots by time.
        
        Parameters:
            a (Snap): First snapshot.
            b (Snap): Second snapshot.
        
        Returns:
            int: -1 if a < b, 1 if a > b, 0 if equal.
        """
        if a.time < b.time:
            return -1
        elif a.time > b.time:
            return 1
        else:
            return 0
    
    # --------------------------------------------------------------------
    # Delete successive snapshots with duplicate time stamp
    # If have same timestamp, combine them if internal flags are different
    
    def cull(self):
        """
        Remove duplicate snapshots based on time stamps and merge their data.
        """
        i = 1
        while i < len(self.snaps):
            if self.snaps[i].time == self.snaps[i-1].time:
                if self.snaps[i].nflag and not self.snaps[i-1].nflag:
                    self.snaps[i-1].nflag = 1
                    self.snaps[i-1].nnodes = self.snaps[i].nnodes
                    self.snaps[i-1].nodes = self.snaps[i].nodes
                    self.snaps[i-1].xlo = self.snaps[i].xlo
                    self.snaps[i-1].xhi = self.snaps[i].xhi
                    self.snaps[i-1].ylo = self.snaps[i].ylo
                    self.snaps[i-1].yhi = self.snaps[i].yhi
                    self.snaps[i-1].zlo = self.snaps[i].zlo
                    self.snaps[i-1].zhi = self.snaps[i].zhi
                elif self.snaps[i].eflag and not self.snaps[i-1].eflag:
                    self.snaps[i-1].eflag = self.snaps[i].eflag
                    self.snaps[i-1].nelements = self.snaps[i].nelements
                    self.snaps[i-1].elements = self.snaps[i].elements
                    self.snaps[i-1].eselect = self.snaps[i].eselect
                elif self.snaps[i].nvalueflag and not self.snaps[i-1].nvalueflag:
                    self.snaps[i-1].nvalueflag = 1
                    self.snaps[i-1].nnvalues = self.snaps[i].nnvalues
                    self.snaps[i-1].nvalues = self.snaps[i].nvalues
                elif self.snaps[i].evalueflag and not self.snaps[i-1].evalueflag:
                    self.snaps[i-1].evalueflag = 1
                    self.snaps[i-1].nevalues = self.snaps[i].nevalues
                    self.snaps[i-1].evalues = self.snaps[i].evalues
                del self.snaps[i]
            else:
                i += 1
    
    # --------------------------------------------------------------------
    # Ensure every snapshot has node and element connectivity info
    # If not, point it at the most recent snapshot that does
    
    def reference(self):
        """
        Ensure every snapshot has node and element connectivity information by referencing previous snapshots.
        """
        for i in range(len(self.snaps)):
            if not self.snaps[i].nflag:
                for j in range(i, -1, -1):
                    if self.snaps[j].nflag:
                        self.snaps[i].nflag = self.snaps[j].nflag
                        self.snaps[i].nnodes = self.snaps[j].nnodes
                        self.snaps[i].nodes = self.snaps[j].nodes
                        self.snaps[i].xlo = self.snaps[j].xlo
                        self.snaps[i].xhi = self.snaps[j].xhi
                        self.snaps[i].ylo = self.snaps[j].ylo
                        self.snaps[i].yhi = self.snaps[j].yhi
                        self.snaps[i].zlo = self.snaps[j].zlo
                        self.snaps[i].zhi = self.snaps[j].zhi
                        break
            if not self.snaps[i].nflag:
                raise Exception("No nodal coordinates found in previous snapshots.")
            if not self.snaps[i].eflag:
                for j in range(i, -1, -1):
                    if self.snaps[j].eflag:
                        self.snaps[i].eflag = self.snaps[j].eflag
                        self.snaps[i].nelements = self.snaps[j].nelements
                        self.snaps[i].elements = self.snaps[j].elements
                        self.snaps[i].eselect = self.snaps[j].eselect
                        break
            if not self.snaps[i].eflag:
                raise Exception("No element connections found in previous snapshots.")
    
    # --------------------------------------------------------------------
    # Iterate over selected snapshots
    
    def iterator(self, flag):
        """
        Iterator method compatible with equivalent dump calls.
        
        Parameters:
            flag (int): 0 for first call, 1 for subsequent calls.
        
        Returns:
            tuple: (index, time, flag)
                   index (int): Index within dump object (0 to # of snapshots)
                   time (int): Timestep value
                   flag (int): -1 when iteration is done, 1 otherwise
        """
        if flag == 0:
            self.iterate = -1  # Initialize iteration
        self.iterate += 1
        if self.iterate >= self.nsnaps:
            return (0, 0, -1)
        snap = self.snaps[self.iterate]
        if not snap.tselect:
            return self.iterator(1)
        return (self.iterate, snap.time, 1)
    
    # --------------------------------------------------------------------
    # Return list of viz objects for a snapshot
    
    def viz(self, index, flag=0):
        """
        Return mesh visualization information for a snapshot.
        
        Parameters:
            index (int): Snapshot index or time stamp.
            flag (int): If 1, interpret index as time stamp.
        
        Returns:
            tuple: (time, box, atoms, bonds, tris, lines)
                   atoms, bonds, lines are None.
                   tris contain [id, type, x1, y1, z1, x2, y2, z2, x3, y3, z3, nx, ny, nz] for each triangle.
        """
        if not flag:
            isnap = index
        else:
            times = self.time()
            n = len(times)
            i = 0
            while i < n:
                if times[i] > index:
                    break
                i += 1
            isnap = i - 1
        if isnap < 0 or isnap >= self.nsnaps:
            return (-1, None, None, None, None, None)
        snap = self.snaps[isnap]
        
        time = snap.time
        box = [snap.xlo, snap.ylo, snap.zlo, snap.xhi, snap.yhi, snap.zhi]
        if self.etype == "":
            type_col = -1
        else:
            if self.etype not in self.names:
                print(f"Unknown etype column: {self.etype}")
                type_col = -1
            else:
                type_col = self.names[self.etype]
    
        atoms = None
        bonds = None
    
        tris = []
        nodes = snap.nodes
        for i in range(snap.nelements):
            if not snap.eselect[i]:
                continue
            element = snap.elements[i]
            if snap.evalueflag:
                evalue = snap.evalues[i]
            else:
                evalue = None
    
            # Process based on element type
            if snap.eflag == 1:
                # Triangle
                try:
                    v1 = nodes[int(element[2]) - 1][2:5].tolist()
                    v2 = nodes[int(element[3]) - 1][2:5].tolist()
                    v3 = nodes[int(element[4]) - 1][2:5].tolist()
                except IndexError:
                    print(f"Error accessing nodes for element {i} in timestep {time}.")
                    continue
                list_vertices = v1 + v2 + v3
                n = normal(list_vertices[0:3], list_vertices[3:6], list_vertices[6:9])
                if type_col == -1:
                    tris.append([int(element[0]), int(element[1])] + list_vertices + n)
                else:
                    tris.append([int(element[0]), int(evalue[type_col])] + list_vertices + n)
    
            elif snap.eflag == 2:
                # Tetrahedron - convert to 4 triangles
                try:
                    v1 = nodes[int(element[2]) - 1][2:5].tolist()
                    v2 = nodes[int(element[3]) - 1][2:5].tolist()
                    v3 = nodes[int(element[4]) - 1][2:5].tolist()
                    v4 = nodes[int(element[5]) - 1][2:5].tolist()
                except IndexError:
                    print(f"Error accessing nodes for element {i} in timestep {time}.")
                    continue
                tris_data = [
                    v1 + v2 + v4,
                    v2 + v3 + v4,
                    v1 + v4 + v3,
                    v1 + v3 + v2
                ]
                for tri_vertices in tris_data:
                    n = normal(tri_vertices[0:3], tri_vertices[3:6], tri_vertices[6:9])
                    if type_col == -1:
                        tris.append([int(element[0]), int(element[1])] + tri_vertices + n)
                    else:
                        tris.append([int(element[0]), int(evalue[type_col])] + tri_vertices + n)
    
            elif snap.eflag == 3:
                # Square - convert to 2 triangles
                try:
                    v1 = nodes[int(element[2]) - 1][2:5].tolist()
                    v2 = nodes[int(element[3]) - 1][2:5].tolist()
                    v3 = nodes[int(element[4]) - 1][2:5].tolist()
                    v4 = nodes[int(element[5]) - 1][2:5].tolist()
                except IndexError:
                    print(f"Error accessing nodes for element {i} in timestep {time}.")
                    continue
                tris_data = [
                    v1 + v2 + v3,
                    v1 + v3 + v4
                ]
                for tri_vertices in tris_data:
                    n = normal(tri_vertices[0:3], tri_vertices[3:6], tri_vertices[6:9])
                    if type_col == -1:
                        tris.append([int(element[0]), int(element[1])] + tri_vertices + n)
                    else:
                        tris.append([int(element[0]), int(evalue[type_col])] + tri_vertices + n)
    
            elif snap.eflag == 4:
                # Cube - convert to 12 triangles
                try:
                    v1 = nodes[int(element[2]) - 1][2:5].tolist()
                    v2 = nodes[int(element[3]) - 1][2:5].tolist()
                    v3 = nodes[int(element[4]) - 1][2:5].tolist()
                    v4 = nodes[int(element[5]) - 1][2:5].tolist()
                    v5 = nodes[int(element[6]) - 1][2:5].tolist()
                    v6 = nodes[int(element[7]) - 1][2:5].tolist()
                    v7 = nodes[int(element[8]) - 1][2:5].tolist()
                    v8 = nodes[int(element[9]) - 1][2:5].tolist()
                except IndexError:
                    print(f"Error accessing nodes for element {i} in timestep {time}.")
                    continue
                tris_data = [
                    # Lower z face
                    v1 + v3 + v2,
                    v1 + v4 + v3,
                    # Upper z face
                    v5 + v6 + v7,
                    v5 + v7 + v8,
                    # Lower y face
                    v1 + v2 + v6,
                    v1 + v6 + v5,
                    # Upper y face
                    v4 + v7 + v3,
                    v4 + v8 + v7,
                    # Lower x face
                    v1 + v8 + v4,
                    v1 + v5 + v8,
                    # Upper x face
                    v2 + v3 + v7,
                    v2 + v7 + v6
                ]
                for tri_vertices in tris_data:
                    n = normal(tri_vertices[0:3], tri_vertices[3:6], tri_vertices[6:9])
                    if type_col == -1:
                        tris.append([int(element[0]), int(element[1])] + tri_vertices + n)
                    else:
                        tris.append([int(element[0]), int(evalue[type_col])] + tri_vertices + n)
        
        # Lines are not used in viz for mesh dumps
        lines = []
    
        return (time, box, atoms, bonds, tris, lines)
    
    # --------------------------------------------------------------------
    # Return lists of node/element info for snapshot isnap
    # If called with flag, then index is timestep, so convert to snapshot index
    
    def mviz(self, index, flag=0):
        """
        Return mesh visualization information for a snapshot, including nodes and elements.
        
        Parameters:
            index (int): Snapshot index or time stamp.
            flag (int): If 1, interpret index as time stamp.
        
        Returns:
            tuple: (time, box, nodes, elements, nvalues, evalues)
        """
        if not flag:
            isnap = index
        else:
            times = self.time()
            n = len(times)
            i = 0
            while i < n:
                if times[i] > index:
                    break
                i += 1
            isnap = i - 1
        if isnap < 0 or isnap >= self.nsnaps:
            return (-1, None, None, None, None, None)
        snap = self.snaps[isnap]
        
        time = snap.time
        box = [snap.xlo, snap.ylo, snap.zlo, snap.xhi, snap.yhi, snap.zhi]
        nodes = []
        elements = []
        nvalues = []
        evalues = []
        
        if snap.nflag and snap.nodes is not None:
            for node in snap.nodes:
                nodes.append([int(node[0]), int(node[1]), node[2], node[3], node[4]])
        if snap.eflag and snap.elements is not None:
            for elem in snap.elements:
                elements.append([int(elem[0]), int(elem[1])] + elem[2:].tolist())
        if snap.nvalueflag and snap.nvalues is not None:
            for nval in snap.nvalues:
                nvalues.append([int(nval[0]), int(nval[1])] + nval[2:].tolist())
        if snap.evalueflag and snap.evalues is not None:
            for eval in snap.evalues:
                evalues.append([int(eval[0]), int(eval[1])] + eval[2:].tolist())
        
        return (time, box, nodes, elements, nvalues, evalues)
    
    # --------------------------------------------------------------------
    # Wrap line end points associated with atoms through periodic boundaries
    # Invoked by dump() when it does an owrap() on its atoms
    
    def owrap(self, time, xprd, yprd, zprd, idsdump, atomsdump, iother, ix, iy, iz):
        """
        Wrap line end points associated with atoms through periodic boundaries.
        
        Parameters:
            time (int): Time stamp of the snapshot.
            xprd, yprd, zprd (float): Periodicity in x, y, z directions.
            idsdump (dict): Dictionary mapping atom IDs to dump indices.
            atomsdump (list): List of atoms from the dump.
            iother (int): Other index for wrapping.
            ix, iy, iz (int): Column indices for x, y, z coordinates.
        """
        try:
            id_col = self.names["id"]
            end1x_col = self.names["end1x"]
            end1y_col = self.names["end1y"]
            end2x_col = self.names["end2x"]
            end2y_col = self.names["end2y"]
        except KeyError as e:
            print(f"Column name {e} not mapped. Use m.map() to assign column names.")
            return
    
        try:
            isnap = self.findtime(time)
        except Exception as e:
            print(e)
            return
        snap = self.snaps[isnap]
        atoms = snap.atoms
    
        # idump = index of my line I in dump's atoms
        # jdump = atom J in dump's atoms that atom I was owrapped on
        # delx, dely = offset applied to atom I and thus to line I
    
        for i in range(snap.nelements):
            try:
                tag = int(atoms[i][id_col])
                idump = idsdump[tag]
                jdump = idsdump[int(atomsdump[idump][iother])]
                delx = (atomsdump[idump][ix] - atomsdump[jdump][ix]) * xprd
                dely = (atomsdump[idump][iy] - atomsdump[jdump][iy]) * yprd
                delz = (atomsdump[idump][iz] - atomsdump[jdump][iz]) * zprd
                # Assuming z-coordinates need to be handled if present
                # If not, the z-component remains unchanged
                # Here, only x and y are being wrapped
                atoms[i][end1x_col] += delx
                atoms[i][end1y_col] += dely
                atoms[i][end2x_col] += delx
                atoms[i][end2y_col] += dely
            except (KeyError, IndexError, ValueError) as e:
                print(f"Error in owrap: {e}")
                continue

Methods

def compare_time(self, a, b)

Comparator for sorting snapshots by time.

Parameters

a (Snap): First snapshot. b (Snap): Second snapshot.

Returns

int
-1 if a < b, 1 if a > b, 0 if equal.
Expand source code
def compare_time(self, a, b):
    """
    Comparator for sorting snapshots by time.
    
    Parameters:
        a (Snap): First snapshot.
        b (Snap): Second snapshot.
    
    Returns:
        int: -1 if a < b, 1 if a > b, 0 if equal.
    """
    if a.time < b.time:
        return -1
    elif a.time > b.time:
        return 1
    else:
        return 0
def cull(self)

Remove duplicate snapshots based on time stamps and merge their data.

Expand source code
def cull(self):
    """
    Remove duplicate snapshots based on time stamps and merge their data.
    """
    i = 1
    while i < len(self.snaps):
        if self.snaps[i].time == self.snaps[i-1].time:
            if self.snaps[i].nflag and not self.snaps[i-1].nflag:
                self.snaps[i-1].nflag = 1
                self.snaps[i-1].nnodes = self.snaps[i].nnodes
                self.snaps[i-1].nodes = self.snaps[i].nodes
                self.snaps[i-1].xlo = self.snaps[i].xlo
                self.snaps[i-1].xhi = self.snaps[i].xhi
                self.snaps[i-1].ylo = self.snaps[i].ylo
                self.snaps[i-1].yhi = self.snaps[i].yhi
                self.snaps[i-1].zlo = self.snaps[i].zlo
                self.snaps[i-1].zhi = self.snaps[i].zhi
            elif self.snaps[i].eflag and not self.snaps[i-1].eflag:
                self.snaps[i-1].eflag = self.snaps[i].eflag
                self.snaps[i-1].nelements = self.snaps[i].nelements
                self.snaps[i-1].elements = self.snaps[i].elements
                self.snaps[i-1].eselect = self.snaps[i].eselect
            elif self.snaps[i].nvalueflag and not self.snaps[i-1].nvalueflag:
                self.snaps[i-1].nvalueflag = 1
                self.snaps[i-1].nnvalues = self.snaps[i].nnvalues
                self.snaps[i-1].nvalues = self.snaps[i].nvalues
            elif self.snaps[i].evalueflag and not self.snaps[i-1].evalueflag:
                self.snaps[i-1].evalueflag = 1
                self.snaps[i-1].nevalues = self.snaps[i].nevalues
                self.snaps[i-1].evalues = self.snaps[i].evalues
            del self.snaps[i]
        else:
            i += 1
def delete(self)

Delete non-selected snapshots.

Expand source code
def delete(self):
    """
    Delete non-selected snapshots.
    """
    ndel = 0
    selected_snaps = []
    for snap in self.snaps:
        if snap.tselect:
            selected_snaps.append(snap)
        else:
            ndel += 1
    self.snaps = selected_snaps
    self.nsnaps = len(self.snaps)
    self.nselect = sum(snap.tselect for snap in self.snaps)
    print(f"{ndel} snapshots deleted")
    print(f"{self.nsnaps} snapshots remaining")
def iterator(self, flag)

Iterator method compatible with equivalent dump calls.

Parameters

flag (int): 0 for first call, 1 for subsequent calls.

Returns

tuple
(index, time, flag) index (int): Index within dump object (0 to # of snapshots) time (int): Timestep value flag (int): -1 when iteration is done, 1 otherwise
Expand source code
def iterator(self, flag):
    """
    Iterator method compatible with equivalent dump calls.
    
    Parameters:
        flag (int): 0 for first call, 1 for subsequent calls.
    
    Returns:
        tuple: (index, time, flag)
               index (int): Index within dump object (0 to # of snapshots)
               time (int): Timestep value
               flag (int): -1 when iteration is done, 1 otherwise
    """
    if flag == 0:
        self.iterate = -1  # Initialize iteration
    self.iterate += 1
    if self.iterate >= self.nsnaps:
        return (0, 0, -1)
    snap = self.snaps[self.iterate]
    if not snap.tselect:
        return self.iterator(1)
    return (self.iterate, snap.time, 1)
def map(self, *pairs)

Assign names to element value columns.

Parameters

*pairs: Pairs of (column_number, "column_name") Example: map(2, "temperature")

Expand source code
def map(self, *pairs):
    """
    Assign names to element value columns.
    
    Parameters:
        *pairs: Pairs of (column_number, "column_name")
               Example: map(2, "temperature")
    """
    if len(pairs) % 2 != 0:
        raise Exception("mdump map() requires pairs of mappings.")
    for i in range(0, len(pairs), 2):
        col_num = pairs[i]
        col_name = pairs[i + 1]
        self.names[col_name] = col_num - 1  # Zero-based indexing
def mviz(self, index, flag=0)

Return mesh visualization information for a snapshot, including nodes and elements.

Parameters

index (int): Snapshot index or time stamp. flag (int): If 1, interpret index as time stamp.

Returns

tuple
(time, box, nodes, elements, nvalues, evalues)
Expand source code
def mviz(self, index, flag=0):
    """
    Return mesh visualization information for a snapshot, including nodes and elements.
    
    Parameters:
        index (int): Snapshot index or time stamp.
        flag (int): If 1, interpret index as time stamp.
    
    Returns:
        tuple: (time, box, nodes, elements, nvalues, evalues)
    """
    if not flag:
        isnap = index
    else:
        times = self.time()
        n = len(times)
        i = 0
        while i < n:
            if times[i] > index:
                break
            i += 1
        isnap = i - 1
    if isnap < 0 or isnap >= self.nsnaps:
        return (-1, None, None, None, None, None)
    snap = self.snaps[isnap]
    
    time = snap.time
    box = [snap.xlo, snap.ylo, snap.zlo, snap.xhi, snap.yhi, snap.zhi]
    nodes = []
    elements = []
    nvalues = []
    evalues = []
    
    if snap.nflag and snap.nodes is not None:
        for node in snap.nodes:
            nodes.append([int(node[0]), int(node[1]), node[2], node[3], node[4]])
    if snap.eflag and snap.elements is not None:
        for elem in snap.elements:
            elements.append([int(elem[0]), int(elem[1])] + elem[2:].tolist())
    if snap.nvalueflag and snap.nvalues is not None:
        for nval in snap.nvalues:
            nvalues.append([int(nval[0]), int(nval[1])] + nval[2:].tolist())
    if snap.evalueflag and snap.evalues is not None:
        for eval in snap.evalues:
            evalues.append([int(eval[0]), int(eval[1])] + eval[2:].tolist())
    
    return (time, box, nodes, elements, nvalues, evalues)
def next(self)

Read the next snapshot in incremental mode.

Returns

int
Time stamp of the snapshot read, or -1 if no snapshots left.
Expand source code
def next(self):
    """
    Read the next snapshot in incremental mode.
    
    Returns:
        int: Time stamp of the snapshot read, or -1 if no snapshots left.
    """
    if not self.increment:
        raise Exception("Cannot read incrementally with current mdump configuration.")

    # Read next snapshot in current file using eof as pointer
    while self.nextfile < len(self.flist):
        file = self.flist[self.nextfile]
        # Open the current file
        if file.endswith(".gz"):
            try:
                f = popen(f"{PIZZA_GUNZIP} -c {file}", 'r')
            except Exception as e:
                print(f"Error opening gzipped file {file}: {e}")
                self.nextfile += 1
                continue
        else:
            try:
                f = open(file, 'r')
            except Exception as e:
                print(f"Error opening file {file}: {e}")
                self.nextfile += 1
                continue

        # Seek to the last read position
        f.seek(self.eof)
        snap = self.read_snapshot(f)
        if not snap:
            # End of file reached
            f.close()
            self.nextfile += 1
            self.eof = 0
            continue
        self.eof = f.tell()
        f.close()
            
        # Check for duplicate time stamp
        if any(existing_snap.time == snap.time for existing_snap in self.snaps):
            continue  # Skip duplicate
        self.snaps.append(snap)
        snap.tselect = 1
        snap.nselect = snap.nelements
        if snap.eflag:
            snap.eselect = np.ones(snap.nelements, dtype=int)
        self.nsnaps += 1
        self.nselect += 1
        return snap.time

    return -1  # No snapshots left
def owrap(self, time, xprd, yprd, zprd, idsdump, atomsdump, iother, ix, iy, iz)

Wrap line end points associated with atoms through periodic boundaries.

Parameters

time (int): Time stamp of the snapshot. xprd, yprd, zprd (float): Periodicity in x, y, z directions. idsdump (dict): Dictionary mapping atom IDs to dump indices. atomsdump (list): List of atoms from the dump. iother (int): Other index for wrapping. ix, iy, iz (int): Column indices for x, y, z coordinates.

Expand source code
def owrap(self, time, xprd, yprd, zprd, idsdump, atomsdump, iother, ix, iy, iz):
    """
    Wrap line end points associated with atoms through periodic boundaries.
    
    Parameters:
        time (int): Time stamp of the snapshot.
        xprd, yprd, zprd (float): Periodicity in x, y, z directions.
        idsdump (dict): Dictionary mapping atom IDs to dump indices.
        atomsdump (list): List of atoms from the dump.
        iother (int): Other index for wrapping.
        ix, iy, iz (int): Column indices for x, y, z coordinates.
    """
    try:
        id_col = self.names["id"]
        end1x_col = self.names["end1x"]
        end1y_col = self.names["end1y"]
        end2x_col = self.names["end2x"]
        end2y_col = self.names["end2y"]
    except KeyError as e:
        print(f"Column name {e} not mapped. Use m.map() to assign column names.")
        return

    try:
        isnap = self.findtime(time)
    except Exception as e:
        print(e)
        return
    snap = self.snaps[isnap]
    atoms = snap.atoms

    # idump = index of my line I in dump's atoms
    # jdump = atom J in dump's atoms that atom I was owrapped on
    # delx, dely = offset applied to atom I and thus to line I

    for i in range(snap.nelements):
        try:
            tag = int(atoms[i][id_col])
            idump = idsdump[tag]
            jdump = idsdump[int(atomsdump[idump][iother])]
            delx = (atomsdump[idump][ix] - atomsdump[jdump][ix]) * xprd
            dely = (atomsdump[idump][iy] - atomsdump[jdump][iy]) * yprd
            delz = (atomsdump[idump][iz] - atomsdump[jdump][iz]) * zprd
            # Assuming z-coordinates need to be handled if present
            # If not, the z-component remains unchanged
            # Here, only x and y are being wrapped
            atoms[i][end1x_col] += delx
            atoms[i][end1y_col] += dely
            atoms[i][end2x_col] += delx
            atoms[i][end2y_col] += dely
        except (KeyError, IndexError, ValueError) as e:
            print(f"Error in owrap: {e}")
            continue
def read_all(self)

Read all snapshots from the list of dump files.

Expand source code
def read_all(self):
    """
    Read all snapshots from the list of dump files.
    """
    for file in self.flist:
        # Test for gzipped file
        if file.endswith(".gz"):
            try:
                f = popen(f"{PIZZA_GUNZIP} -c {file}", 'r')
            except Exception as e:
                print(f"Error opening gzipped file {file}: {e}")
                continue
        else:
            try:
                f = open(file, 'r')
            except Exception as e:
                print(f"Error opening file {file}: {e}")
                continue

        # Read all snapshots in the current file
        while True:
            snap = self.read_snapshot(f)
            if not snap:
                break
            self.snaps.append(snap)
            print(snap.time, end=' ')
            sys.stdout.flush()

        f.close()
    print()

    # Sort entries by timestep and remove duplicates
    self.snaps.sort(key=lambda x: x.time)
    self.cull()

    # Sort all node, element, nvalue, evalue arrays by ID
    for snap in self.snaps:
        if snap.nflag and snap.nodes is not None:
            array = snap.nodes
            ids = array[:, 0]
            ordering = np.argsort(ids)
            for i in range(array.shape[1]):
                array[:, i] = np.take(array[:, i], ordering)
        if snap.eflag and snap.elements is not None:
            array = snap.elements
            ids = array[:, 0]
            ordering = np.argsort(ids)
            for i in range(array.shape[1]):
                array[:, i] = np.take(array[:, i], ordering)
        if snap.nvalueflag and snap.nvalues is not None:
            array = snap.nvalues
            ids = array[:, 0]
            ordering = np.argsort(ids)
            for i in range(array.shape[1]):
                array[:, i] = np.take(array[:, i], ordering)
        if snap.evalueflag and snap.evalues is not None:
            array = snap.evalues
            ids = array[:, 0]
            ordering = np.argsort(ids)
            for i in range(array.shape[1]):
                array[:, i] = np.take(array[:, i], ordering)

    # Reference definitions of nodes and elements in previous timesteps
    self.reference()
    
    self.nsnaps = len(self.snaps)
    print(f"read {self.nsnaps} snapshots")

    # Select all timesteps and elements
    self.tselect.all()
def read_snapshot(self, f)

Read a single snapshot from the file.

Parameters

f (file object): Opened file object to read from.

Returns

Snap
Snapshot object, or None if failed.
Expand source code
def read_snapshot(self, f):
    """
    Read a single snapshot from the file.
    
    Parameters:
        f (file object): Opened file object to read from.
    
    Returns:
        Snap: Snapshot object, or None if failed.
    """
    try:
        snap = Snap()
        # Read and discard the first line (usually a comment or header)
        item = f.readline()
        if not item:
            return None
        # Read time stamp
        line = f.readline()
        if not line:
            return None
        snap.time = int(line.strip())
        # Read snapshot type
        line = f.readline()
        if not line:
            return None
        if "NUMBER OF NODES" in line:
            snap.nflag = 1
        elif "NUMBER OF TRIANGLES" in line:
            snap.eflag = 1
        elif "NUMBER OF TETS" in line:
            snap.eflag = 2
        elif "NUMBER OF SQUARES" in line:
            snap.eflag = 3
        elif "NUMBER OF CUBES" in line:
            snap.eflag = 4
        elif "NUMBER OF NODE VALUES" in line:
            snap.nvalueflag = 1
        elif "NUMBER OF ELEMENT VALUES" in line:
            snap.evalueflag = 1
        else:
            raise Exception("Unrecognized snapshot type in dump file.")
        # Read number of nodes/elements/values
        line = f.readline()
        if not line:
            return None
        n = int(line.strip())

        if snap.eflag:
            snap.eselect = np.zeros(n, dtype=int)

        if snap.nflag:
            # Read box bounds
            f.readline()  # Read past header
            words = f.readline().split()
            snap.xlo, snap.xhi = float(words[0]), float(words[1])
            words = f.readline().split()
            snap.ylo, snap.yhi = float(words[0]), float(words[1])
            words = f.readline().split()
            snap.zlo, snap.zhi = float(words[0]), float(words[1])
            
        f.readline()  # Read past another header

        if n:
            # Read node/element/value data
            words = f.readline().split()
            ncol = len(words)
            data = []
            data.append([float(value) for value in words])
            for _ in range(1, n):
                words = f.readline().split()
                if not words:
                    break
                data.append([float(value) for value in words])
            values = np.array(data, dtype=float)
        else:
            values = None

        if snap.nflag:
            snap.nodes = values
            snap.nnodes = n
        elif snap.eflag:
            snap.elements = values
            snap.nelements = n
            snap.eselect = np.zeros(n, dtype=int)  # Initialize element selection
        elif snap.nvalueflag:
            snap.nvalues = values
            snap.nnvalues = n
        elif snap.evalueflag:
            snap.evalues = values
            snap.nevalues = n
        return snap
    except Exception as e:
        print(f"Error reading snapshot: {e}")
        return None
def reference(self)

Ensure every snapshot has node and element connectivity information by referencing previous snapshots.

Expand source code
def reference(self):
    """
    Ensure every snapshot has node and element connectivity information by referencing previous snapshots.
    """
    for i in range(len(self.snaps)):
        if not self.snaps[i].nflag:
            for j in range(i, -1, -1):
                if self.snaps[j].nflag:
                    self.snaps[i].nflag = self.snaps[j].nflag
                    self.snaps[i].nnodes = self.snaps[j].nnodes
                    self.snaps[i].nodes = self.snaps[j].nodes
                    self.snaps[i].xlo = self.snaps[j].xlo
                    self.snaps[i].xhi = self.snaps[j].xhi
                    self.snaps[i].ylo = self.snaps[j].ylo
                    self.snaps[i].yhi = self.snaps[j].yhi
                    self.snaps[i].zlo = self.snaps[j].zlo
                    self.snaps[i].zhi = self.snaps[j].zhi
                    break
        if not self.snaps[i].nflag:
            raise Exception("No nodal coordinates found in previous snapshots.")
        if not self.snaps[i].eflag:
            for j in range(i, -1, -1):
                if self.snaps[j].eflag:
                    self.snaps[i].eflag = self.snaps[j].eflag
                    self.snaps[i].nelements = self.snaps[j].nelements
                    self.snaps[i].elements = self.snaps[j].elements
                    self.snaps[i].eselect = self.snaps[j].eselect
                    break
        if not self.snaps[i].eflag:
            raise Exception("No element connections found in previous snapshots.")
def sort(self, *args)

Sort snapshots by ID in all selected timesteps or one specified timestep.

Parameters

*args: If provided, sorts elements in the specified timestep.

Expand source code
def sort(self, *args):
    """
    Sort snapshots by ID in all selected timesteps or one specified timestep.
    
    Parameters:
        *args: If provided, sorts elements in the specified timestep.
    """
    if len(args) == 0:
        print("Sorting selected snapshots ...")
        if "id" not in self.names:
            print("Column 'id' not mapped. Use m.map() to assign column names.")
            return
        id_col = self.names["id"]
        for snap in self.snaps:
            if snap.tselect:
                self.sort_one(snap, id_col)
    else:
        try:
            n = args[0]
            i = self.findtime(n)
            if "id" not in self.names:
                print("Column 'id' not mapped. Use m.map() to assign column names.")
                return
            id_col = self.names["id"]
            self.sort_one(self.snaps[i], id_col)
        except Exception as e:
            print(e)
def sort_one(self, snap, id_col)

Sort elements in a single snapshot by ID column.

Parameters

snap (Snap): The snapshot to sort. id_col (int): The column index for 'id'.

Expand source code
def sort_one(self, snap, id_col):
    """
    Sort elements in a single snapshot by ID column.
    
    Parameters:
        snap (Snap): The snapshot to sort.
        id_col (int): The column index for 'id'.
    """
    if snap.elements is not None:
        array = snap.elements
        ids = array[:, id_col]
        ordering = np.argsort(ids)
        for i in range(array.shape[1]):
            array[:, i] = np.take(array[:, i], ordering)
def time(self)

Get a list of all selected snapshot time stamps.

Returns

list
List of time stamps.
Expand source code
def time(self):
    """
    Get a list of all selected snapshot time stamps.
    
    Returns:
        list: List of time stamps.
    """
    return [snap.time for snap in self.snaps if snap.tselect]
def vecs(self, n, *columns)

Extract vector(s) of values for selected elements at a specific timestep.

Parameters

n (int): Timestep to extract vectors from. *columns (str): Column names to extract.

Returns

list
List of vectors corresponding to the specified columns.
Expand source code
def vecs(self, n, *columns):
    """
    Extract vector(s) of values for selected elements at a specific timestep.
    
    Parameters:
        n (int): Timestep to extract vectors from.
        *columns (str): Column names to extract.
    
    Returns:
        list: List of vectors corresponding to the specified columns.
    """
    try:
        snap = self.snaps[self.findtime(n)]
    except Exception as e:
        print(e)
        return []
    if not snap.evalueflag:
        raise Exception("Snapshot has no element values.")
    
    if len(columns) == 0:
        raise Exception("No columns specified.")
    
    column_indices = []
    for name in columns:
        if name not in self.names:
            print(f"Unknown column name: {name}")
            return []
        column_indices.append(self.names[name])
    
    values = []
    for col in column_indices:
        vec = []
        for i in range(snap.nelements):
            if snap.eselect[i]:
                vec.append(snap.evalues[i][col])
        values.append(vec)
    
    if len(values) == 1:
        return values[0]
    else:
        return values
def viz(self, index, flag=0)

Return mesh visualization information for a snapshot.

Parameters

index (int): Snapshot index or time stamp. flag (int): If 1, interpret index as time stamp.

Returns

tuple
(time, box, atoms, bonds, tris, lines) atoms, bonds, lines are None. tris contain [id, type, x1, y1, z1, x2, y2, z2, x3, y3, z3, nx, ny, nz] for each triangle.
Expand source code
def viz(self, index, flag=0):
    """
    Return mesh visualization information for a snapshot.
    
    Parameters:
        index (int): Snapshot index or time stamp.
        flag (int): If 1, interpret index as time stamp.
    
    Returns:
        tuple: (time, box, atoms, bonds, tris, lines)
               atoms, bonds, lines are None.
               tris contain [id, type, x1, y1, z1, x2, y2, z2, x3, y3, z3, nx, ny, nz] for each triangle.
    """
    if not flag:
        isnap = index
    else:
        times = self.time()
        n = len(times)
        i = 0
        while i < n:
            if times[i] > index:
                break
            i += 1
        isnap = i - 1
    if isnap < 0 or isnap >= self.nsnaps:
        return (-1, None, None, None, None, None)
    snap = self.snaps[isnap]
    
    time = snap.time
    box = [snap.xlo, snap.ylo, snap.zlo, snap.xhi, snap.yhi, snap.zhi]
    if self.etype == "":
        type_col = -1
    else:
        if self.etype not in self.names:
            print(f"Unknown etype column: {self.etype}")
            type_col = -1
        else:
            type_col = self.names[self.etype]

    atoms = None
    bonds = None

    tris = []
    nodes = snap.nodes
    for i in range(snap.nelements):
        if not snap.eselect[i]:
            continue
        element = snap.elements[i]
        if snap.evalueflag:
            evalue = snap.evalues[i]
        else:
            evalue = None

        # Process based on element type
        if snap.eflag == 1:
            # Triangle
            try:
                v1 = nodes[int(element[2]) - 1][2:5].tolist()
                v2 = nodes[int(element[3]) - 1][2:5].tolist()
                v3 = nodes[int(element[4]) - 1][2:5].tolist()
            except IndexError:
                print(f"Error accessing nodes for element {i} in timestep {time}.")
                continue
            list_vertices = v1 + v2 + v3
            n = normal(list_vertices[0:3], list_vertices[3:6], list_vertices[6:9])
            if type_col == -1:
                tris.append([int(element[0]), int(element[1])] + list_vertices + n)
            else:
                tris.append([int(element[0]), int(evalue[type_col])] + list_vertices + n)

        elif snap.eflag == 2:
            # Tetrahedron - convert to 4 triangles
            try:
                v1 = nodes[int(element[2]) - 1][2:5].tolist()
                v2 = nodes[int(element[3]) - 1][2:5].tolist()
                v3 = nodes[int(element[4]) - 1][2:5].tolist()
                v4 = nodes[int(element[5]) - 1][2:5].tolist()
            except IndexError:
                print(f"Error accessing nodes for element {i} in timestep {time}.")
                continue
            tris_data = [
                v1 + v2 + v4,
                v2 + v3 + v4,
                v1 + v4 + v3,
                v1 + v3 + v2
            ]
            for tri_vertices in tris_data:
                n = normal(tri_vertices[0:3], tri_vertices[3:6], tri_vertices[6:9])
                if type_col == -1:
                    tris.append([int(element[0]), int(element[1])] + tri_vertices + n)
                else:
                    tris.append([int(element[0]), int(evalue[type_col])] + tri_vertices + n)

        elif snap.eflag == 3:
            # Square - convert to 2 triangles
            try:
                v1 = nodes[int(element[2]) - 1][2:5].tolist()
                v2 = nodes[int(element[3]) - 1][2:5].tolist()
                v3 = nodes[int(element[4]) - 1][2:5].tolist()
                v4 = nodes[int(element[5]) - 1][2:5].tolist()
            except IndexError:
                print(f"Error accessing nodes for element {i} in timestep {time}.")
                continue
            tris_data = [
                v1 + v2 + v3,
                v1 + v3 + v4
            ]
            for tri_vertices in tris_data:
                n = normal(tri_vertices[0:3], tri_vertices[3:6], tri_vertices[6:9])
                if type_col == -1:
                    tris.append([int(element[0]), int(element[1])] + tri_vertices + n)
                else:
                    tris.append([int(element[0]), int(evalue[type_col])] + tri_vertices + n)

        elif snap.eflag == 4:
            # Cube - convert to 12 triangles
            try:
                v1 = nodes[int(element[2]) - 1][2:5].tolist()
                v2 = nodes[int(element[3]) - 1][2:5].tolist()
                v3 = nodes[int(element[4]) - 1][2:5].tolist()
                v4 = nodes[int(element[5]) - 1][2:5].tolist()
                v5 = nodes[int(element[6]) - 1][2:5].tolist()
                v6 = nodes[int(element[7]) - 1][2:5].tolist()
                v7 = nodes[int(element[8]) - 1][2:5].tolist()
                v8 = nodes[int(element[9]) - 1][2:5].tolist()
            except IndexError:
                print(f"Error accessing nodes for element {i} in timestep {time}.")
                continue
            tris_data = [
                # Lower z face
                v1 + v3 + v2,
                v1 + v4 + v3,
                # Upper z face
                v5 + v6 + v7,
                v5 + v7 + v8,
                # Lower y face
                v1 + v2 + v6,
                v1 + v6 + v5,
                # Upper y face
                v4 + v7 + v3,
                v4 + v8 + v7,
                # Lower x face
                v1 + v8 + v4,
                v1 + v5 + v8,
                # Upper x face
                v2 + v3 + v7,
                v2 + v7 + v6
            ]
            for tri_vertices in tris_data:
                n = normal(tri_vertices[0:3], tri_vertices[3:6], tri_vertices[6:9])
                if type_col == -1:
                    tris.append([int(element[0]), int(element[1])] + tri_vertices + n)
                else:
                    tris.append([int(element[0]), int(evalue[type_col])] + tri_vertices + n)
    
    # Lines are not used in viz for mesh dumps
    lines = []

    return (time, box, atoms, bonds, tris, lines)
class tselect (data)

Time selection class for selecting timesteps in mdump.

Expand source code
class tselect:
    """
    Time selection class for selecting timesteps in mdump.
    """
    
    def __init__(self, data):
        self.data = data
    
    # --------------------------------------------------------------------
    
    def all(self):
        """
        Select all timesteps.
        """
        data = self.data
        for snap in data.snaps:
            snap.tselect = 1
        data.nselect = len(data.snaps)
        data.eselect.all()
        print(f"{data.nselect} snapshots selected out of {data.nsnaps}")
    
    # --------------------------------------------------------------------
    
    def one(self, n):
        """
        Select only timestep N.
        
        Parameters:
            n (int): Timestep to select.
        """
        data = self.data
        for snap in data.snaps:
            snap.tselect = 0
        try:
            i = data.findtime(n)
            data.snaps[i].tselect = 1
            data.nselect = 1
            data.eselect.all()
            print(f"{data.nselect} snapshots selected out of {data.nsnaps}")
        except Exception as e:
            print(e)
    
    # --------------------------------------------------------------------
    
    def none(self):
        """
        Deselect all timesteps.
        """
        data = self.data
        for snap in data.snaps:
            snap.tselect = 0
        data.nselect = 0
        print(f"{data.nselect} snapshots selected out of {data.nsnaps}")
    
    # --------------------------------------------------------------------
    
    def skip(self, M):
        """
        Select every Mth step.
        
        Parameters:
            M (int): Interval for selecting timesteps.
        """
        data = self.data
        count = M - 1
        for snap in data.snaps:
            if not snap.tselect:
                continue
            count += 1
            if count == M:
                count = 0
                continue
            snap.tselect = 0
            data.nselect -= 1
        data.eselect.all()
        print(f"{data.nselect} snapshots selected out of {data.nsnaps}")
    
    # --------------------------------------------------------------------
    
    def test(self, teststr):
        """
        Select timesteps based on a Python Boolean expression.
        
        Parameters:
            teststr (str): Boolean expression using $t for timestep value.
                           Example: "$t >= 100 and $t < 10000"
        """
        data = self.data
        snaps = data.snaps
        # Replace $t with snaps[i].time and compile the command
        teststr_replaced = teststr.replace("$t", "snap.time")
        cmd = f"flag = {teststr_replaced}"
        try:
            ccmd = compile(cmd, '', 'single')
        except Exception as e:
            print(f"Error compiling test string: {e}")
            return
        for snap in snaps:
            if not snap.tselect:
                continue
            try:
                exec(ccmd, {"snap": snap})
                if not flag:
                    snap.tselect = 0
                    data.nselect -= 1
            except Exception as e:
                print(f"Error executing test string on timestep {snap.time}: {e}")
        data.eselect.all()
        print(f"{data.nselect} snapshots selected out of {data.nsnaps}")

Methods

def all(self)

Select all timesteps.

Expand source code
def all(self):
    """
    Select all timesteps.
    """
    data = self.data
    for snap in data.snaps:
        snap.tselect = 1
    data.nselect = len(data.snaps)
    data.eselect.all()
    print(f"{data.nselect} snapshots selected out of {data.nsnaps}")
def none(self)

Deselect all timesteps.

Expand source code
def none(self):
    """
    Deselect all timesteps.
    """
    data = self.data
    for snap in data.snaps:
        snap.tselect = 0
    data.nselect = 0
    print(f"{data.nselect} snapshots selected out of {data.nsnaps}")
def one(self, n)

Select only timestep N.

Parameters

n (int): Timestep to select.

Expand source code
def one(self, n):
    """
    Select only timestep N.
    
    Parameters:
        n (int): Timestep to select.
    """
    data = self.data
    for snap in data.snaps:
        snap.tselect = 0
    try:
        i = data.findtime(n)
        data.snaps[i].tselect = 1
        data.nselect = 1
        data.eselect.all()
        print(f"{data.nselect} snapshots selected out of {data.nsnaps}")
    except Exception as e:
        print(e)
def skip(self, M)

Select every Mth step.

Parameters

M (int): Interval for selecting timesteps.

Expand source code
def skip(self, M):
    """
    Select every Mth step.
    
    Parameters:
        M (int): Interval for selecting timesteps.
    """
    data = self.data
    count = M - 1
    for snap in data.snaps:
        if not snap.tselect:
            continue
        count += 1
        if count == M:
            count = 0
            continue
        snap.tselect = 0
        data.nselect -= 1
    data.eselect.all()
    print(f"{data.nselect} snapshots selected out of {data.nsnaps}")
def test(self, teststr)

Select timesteps based on a Python Boolean expression.

Parameters

teststr (str): Boolean expression using $t for timestep value. Example: "$t >= 100 and $t < 10000"

Expand source code
def test(self, teststr):
    """
    Select timesteps based on a Python Boolean expression.
    
    Parameters:
        teststr (str): Boolean expression using $t for timestep value.
                       Example: "$t >= 100 and $t < 10000"
    """
    data = self.data
    snaps = data.snaps
    # Replace $t with snaps[i].time and compile the command
    teststr_replaced = teststr.replace("$t", "snap.time")
    cmd = f"flag = {teststr_replaced}"
    try:
        ccmd = compile(cmd, '', 'single')
    except Exception as e:
        print(f"Error compiling test string: {e}")
        return
    for snap in snaps:
        if not snap.tselect:
            continue
        try:
            exec(ccmd, {"snap": snap})
            if not flag:
                snap.tselect = 0
                data.nselect -= 1
        except Exception as e:
            print(f"Error executing test string on timestep {snap.time}: {e}")
    data.eselect.all()
    print(f"{data.nselect} snapshots selected out of {data.nsnaps}")