Module data3_legacy

data Class (legacy code)

The data class provides tools to read, write, and manipulate LAMMPS data files, enabling seamless integration with the dump class for restart generation and simulation data management.

This is the legacy module, use pizza.data3 instead.


Features

  • Input Handling:
  • Supports single or multiple data files, including gzipped files.
  • Create empty data objects or initialize from an existing dump object.

  • Headers and Sections:

  • Access and modify headers, including atom counts and box dimensions.
  • Define, reorder, append, and replace columns in data file sections.

  • Integration with dump:

  • Generate restart files from dump snapshots.
  • Replace atomic positions and velocities in Atoms and Velocities sections.

  • Visualization:

  • Extract atoms and bonds for visualization tools.
  • Iterate over single data file snapshots (compatible with dump).

Usage

Initialization

  • From a File: python d = data("data.poly") # Read a LAMMPS data file

  • Create an Empty Object: python d = data() # Create an empty data object

  • From a dump Object: python d = data(dump_obj, timestep) # Generate data object from dump snapshot

Accessing Data

  • Headers: python d.headers["atoms"] = 1500 # Set atom count in header

  • Sections: python d.sections["Atoms"] = lines # Define the <code>Atoms</code> section

Manipulation

  • Column Mapping: python d.map(1, "id", 3, "x") # Assign names to columns

  • Reorder Columns: python d.reorder("Atoms", 1, 3, 2, 4) # Reorder columns in a section

  • Replace or Append Data: python d.replace("Atoms", 5, vec) # Replace a column in <code>Atoms</code> d.append("Atoms", vec) # Append a new column to <code>Atoms</code>

  • Delete Headers or Sections: python d.delete("Bonds") # Remove the <code>Bonds</code> section

Output

  • Write to a File: python d.write("data.new") # Write the data object to a file

Visualization

  • Extract Data for Visualization: python time, box, atoms, bonds, tris, lines = d.viz(0)

Integration with dump

  • Replace Atomic Positions: python d.newxyz(dump_obj, timestep) # Replace atomic positions with <code><a title="data3_legacy.dump" href="#data3_legacy.dump">dump</a></code> data

Examples

Basic Usage

d = data("data.poly")             # Load a LAMMPS data file
d.headers["atoms"] = 2000         # Update atom count
d.reorder("Atoms", 1, 3, 2, 4)    # Reorder columns in `Atoms`
d.write("data.new")               # Save to a new file

Restart Generation

dump_obj = dump("dump.poly")
d = data(dump_obj, 1000)          # Create data object from dump
d.write("data.restart")           # Write restart file

Visualization

time, box, atoms, bonds, tris, lines = d.viz(0)

Properties

  • Headers:
  • atoms: Number of atoms in the data file.
  • atom types: Number of atom types.
  • xlo xhi, ylo yhi, zlo zhi: Box dimensions.

  • Sections:

  • Atoms: Atomic data (e.g., ID, type, coordinates).
  • Velocities: Atomic velocities (optional).
  • Additional sections for bonds, angles, etc.

Notes

  • Compatibility: Fully compatible with dump for restart and visualization tasks.
  • Error Handling: Automatically validates headers and sections for consistency.
  • Extensibility: Easily add or modify headers, sections, and attributes.

Expand source code
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

"""
# `data` Class (legacy code)

The `data` class provides tools to read, write, and manipulate LAMMPS data files, enabling seamless integration with the `dump` class for restart generation and simulation data management.

This is the legacy module, use pizza.data3 instead.

---

## Features

- **Input Handling**:
  - Supports single or multiple data files, including gzipped files.
  - Create empty data objects or initialize from an existing `dump` object.

- **Headers and Sections**:
  - Access and modify headers, including atom counts and box dimensions.
  - Define, reorder, append, and replace columns in data file sections.

- **Integration with `dump`**:
  - Generate restart files from `dump` snapshots.
  - Replace atomic positions and velocities in `Atoms` and `Velocities` sections.

- **Visualization**:
  - Extract atoms and bonds for visualization tools.
  - Iterate over single data file snapshots (compatible with `dump`).

---

## Usage

### Initialization
- **From a File**:
  ```python
  d = data("data.poly")          # Read a LAMMPS data file
  ```

- **Create an Empty Object**:
  ```python
  d = data()                     # Create an empty data object
  ```

- **From a `dump` Object**:
  ```python
  d = data(dump_obj, timestep)   # Generate data object from dump snapshot
  ```

### Accessing Data
- **Headers**:
  ```python
  d.headers["atoms"] = 1500       # Set atom count in header
  ```

- **Sections**:
  ```python
  d.sections["Atoms"] = lines     # Define the `Atoms` section
  ```

### Manipulation
- **Column Mapping**:
  ```python
  d.map(1, "id", 3, "x")          # Assign names to columns
  ```

- **Reorder Columns**:
  ```python
  d.reorder("Atoms", 1, 3, 2, 4)  # Reorder columns in a section
  ```

- **Replace or Append Data**:
  ```python
  d.replace("Atoms", 5, vec)      # Replace a column in `Atoms`
  d.append("Atoms", vec)          # Append a new column to `Atoms`
  ```

- **Delete Headers or Sections**:
  ```python
  d.delete("Bonds")               # Remove the `Bonds` section
  ```

### Output
- **Write to a File**:
  ```python
  d.write("data.new")             # Write the data object to a file
  ```

### Visualization
- **Extract Data for Visualization**:
  ```python
  time, box, atoms, bonds, tris, lines = d.viz(0)
  ```

### Integration with `dump`
- **Replace Atomic Positions**:
  ```python
  d.newxyz(dump_obj, timestep)    # Replace atomic positions with `dump` data
  ```

---

## Examples

### Basic Usage
```python
d = data("data.poly")             # Load a LAMMPS data file
d.headers["atoms"] = 2000         # Update atom count
d.reorder("Atoms", 1, 3, 2, 4)    # Reorder columns in `Atoms`
d.write("data.new")               # Save to a new file
```

### Restart Generation
```python
dump_obj = dump("dump.poly")
d = data(dump_obj, 1000)          # Create data object from dump
d.write("data.restart")           # Write restart file
```

### Visualization
```python
time, box, atoms, bonds, tris, lines = d.viz(0)
```

---

## Properties
- **Headers**:
  - `atoms`: Number of atoms in the data file.
  - `atom types`: Number of atom types.
  - `xlo xhi`, `ylo yhi`, `zlo zhi`: Box dimensions.

- **Sections**:
  - `Atoms`: Atomic data (e.g., ID, type, coordinates).
  - `Velocities`: Atomic velocities (optional).
  - Additional sections for bonds, angles, etc.

---

## Notes
- **Compatibility**: Fully compatible with `dump` for restart and visualization tasks.
- **Error Handling**: Automatically validates headers and sections for consistency.
- **Extensibility**: Easily add or modify headers, sections, and attributes.

---
"""


__project__ = "Pizza3"
__author__ = "Olivier Vitrac"
__copyright__ = "Copyright 2022"
__credits__ = ["Steve Plimpton", "Olivier Vitrac"]
__license__ = "GPLv3"
__maintainer__ = "Olivier Vitrac"
__email__ = "olivier.vitrac@agroparistech.fr"
__version__ = "1.0"


# Pizza.py toolkit, www.cs.sandia.gov/~sjplimp/pizza.html
# Steve Plimpton, sjplimp@sandia.gov, Sandia National Laboratories
#
# Copyright (2005) Sandia Corporation.  Under the terms of Contract
# DE-AC04-94AL85000 with Sandia Corporation, the U.S. Government retains
# certain rights in this software.  This software is distributed under
# the GNU General Public License.

# data tool

# Code converted to pyton 3.x
# INRAE\olivier.vitrac@agroparistech.fr
#
# last release
# 2022-02-03 - add flist, __repr__
# 2022-02-04 - add append and start to add comments
# 2022-02-10 - first implementation of a full restart object from a dump object
# 2022-02-12 - revised append method, more robust, more verbose
# 2024-12-08 - updated help
# 2025-01-15 - module renamed pizza.data3_legacy

oneline = "Read, write, manipulate LAMMPS data files"

docstr = """
d = data("data.poly")            read a LAMMPS data file, can be gzipped
d = data()                       create an empty data file

d.map(1,"id",3,"x")              assign names to atom columns (1-N)

coeffs = d.get("Pair Coeffs")    extract info from data file section
q = d.get("Atoms",4)

  1 arg = all columns returned as 2d array of floats
  2 args = Nth column returned as vector of floats

d.reorder("Atoms",1,3,2,4,5)     reorder columns (1-N) in a data file section

  1,3,2,4,5 = new order of previous columns, can delete columns this way

d.title = "My LAMMPS data file"  set title of the data file
d.headers["atoms"] = 1500        set a header value
d.sections["Bonds"] = lines      set a section to list of lines (with newlines)
d.delete("bonds")                delete a keyword or section of data file
d.delete("Bonds")
d.replace("Atoms",5,vec)         replace Nth column of section with vector
d.newxyz(dmp,1000)               replace xyz in Atoms with xyz of snapshot N

  newxyz assumes id,x,y,z are defined in both data and dump files
    also replaces ix,iy,iz if they are defined

index,time,flag = d.iterator(0/1)          loop over single data file snapshot
time,box,atoms,bonds,tris,lines = d.viz(index)   return list of viz objects

  iterator() and viz() are compatible with equivalent dump calls
  iterator() called with arg = 0 first time, with arg = 1 on subsequent calls
    index = timestep index within dump object (only 0 for data file)
    time = timestep value (only 0 for data file)
    flag = -1 when iteration is done, 1 otherwise
  viz() returns info for specified timestep index (must be 0)
    time = 0
    box = [xlo,ylo,zlo,xhi,yhi,zhi]
    atoms = id,type,x,y,z for each atom as 2d array
    bonds = id,type,x1,y1,z1,x2,y2,z2,t1,t2 for each bond as 2d array
      NULL if bonds do not exist
    tris = NULL
    lines = NULL

d.write("data.new")             write a LAMMPS data file
"""

# History
#   8/05, Steve Plimpton (SNL): original version
#   11/07, added triclinic box support

# ToDo list

# Variables
#   title = 1st line of data file
#   names = dictionary with atom attributes as keys, col #s as values
#   headers = dictionary with header name as key, value or tuple as values
#   sections = dictionary with section name as key, array of lines as values
#   nselect = 1 = # of snapshots

# Imports and external programs

# External dependency
from os import popen
import numpy as np

# Dependecy for the creation of DATA restart object from a DUMP object
from pizza.dump3 import dump

__all__ = ['data', 'dump']

#try:
#    tmp = PIZZA_GUNZIP
#except:
PIZZA_GUNZIP = "gunzip"

# Class definition


class data:

    # --------------------------------------------------------------------

    def __init__(self, *list):
        self.nselect = 1

        if len(list) == 0:

        # ========================================
        # Default Constructor (empty object)
        # ========================================

            self.title = "LAMMPS data file"
            self.names = {}
            self.headers = {}
            self.sections = {}
            self.flist = []
            self.restart = False
            return

        elif isinstance(list[0],dump):

        # ========================================
        # Constructor from an existing DUMP object
        # ========================================

            X = list[0]     # supplied dump object
            t = X.time()
            nt = len(t)
            if len(list)>1:
                tselect = list[1]
                if tselect not in t:
                    raise ValueError("the input time is not available in the dump object")
            else:
                tselect = t[-1]
            itselect = next(i for i in range(nt) if t[i]==tselect)
            # set object title
            self.title = 'LAMMPS data file (restart from "%s" t = %0.5g (frame %d of %d))' %  \
                (X.flist[0],tselect,itselect,nt)
            # set default names                       ------------- HEADERS SECTION -------------
            self.names = {}
            X.tselect.one(tselect)
            snap = X.snaps[itselect]
            # set headers
            self.headers = {    'atoms': snap.natoms,
                                'atom types': X.minmax("type")[1],
                                'xlo xhi': (snap.xlo, snap.xhi),
                                'ylo yhi': (snap.ylo, snap.yhi),
                                'zlo zhi': (snap.zlo, snap.zhi)}
            # Default sections
            self.sections = {}
            # set atoms (specific to your style/kind) ------------- ATOMS SUBSECTION -------------
            template_atoms = {"smd": ["id","type","mol","c_vol", "mass", "radius",
                                      "c_contact_radius", "x", "y", "z", "f_1[1]", "f_1[2]", "f_1[3]"] }
            if X.kind(template_atoms["smd"]):
                for col in template_atoms["smd"]:
                    self.append("Atoms", X.vecs(tselect, col), col in ["id","type","mol"],col)
            else:
                raise ValueError("Please add your ATOMS section in the constructor")
            # set velocities (if required)           ------------- VELOCITIES SUBSECTION -------------
            template_velocities = {"smd": ["id","vx","vy","vz"]}
            if X.kind(template_atoms["smd"]):
                if X.kind(template_velocities["smd"]):
                    for col in template_velocities["smd"]:
                        self.append("Velocities", X.vecs(tselect, col), col == "id",col)
                else:
                    raise ValueError("the velocities are missing for the style SMD")
            # store filename
            self.flist = list[0].flist
            self.restart = True
            return

        # ===========================================================
        # Regular constructor from DATA file (supplied as filename)
        # ===========================================================

        flist = list
        file = list[0]
        if file[-3:] == ".gz":
            f = popen("%s -c %s" % (PIZZA_GUNZIP, file), "r")
        else:
            f = open(file)

        self.title = f.readline()
        self.names = {}

        headers = {}
        while 1:
            line = f.readline()
            line = line.strip()
            if len(line) == 0:
                continue
            found = 0
            for keyword in hkeywords:
                if line.find(keyword) >= 0:
                    found = 1
                    words = line.split()
                    if (
                        keyword == "xlo xhi"
                        or keyword == "ylo yhi"
                        or keyword == "zlo zhi"
                    ):
                        headers[keyword] = (float(words[0]), float(words[1]))
                    elif keyword == "xy xz yz":
                        headers[keyword] = (
                            float(words[0]),
                            float(words[1]),
                            float(words[2]),
                        )
                    else:
                        headers[keyword] = int(words[0])
            if not found:
                break

        sections = {}
        while 1:
            found = 0
            for pair in skeywords:
                keyword, length = pair[0], pair[1]
                if keyword == line:
                    found = 1
                    if length not in headers: #if not headers.has_key(length):
                        raise ValueError("data section %s has no matching header value" % line)
                    f.readline()
                    list = []
                    for i in range(headers[length]):
                        list.append(f.readline())
                    sections[keyword] = list
            if not found:
                raise ValueError("invalid section %s in data file" % line)
            f.readline()
            line = f.readline()
            if not line:
                break
            line = line.strip()

        f.close()
        self.headers = headers
        self.sections = sections
        self.flist = flist
        self.restart = False

    # --------------------------------------------------------------------
    # Display method (added OV - 2022-02-03)
    def __repr__(self):
        if self.sections == {} or self.headers == {}:
            ret = "empty %s" % self.title
            print(ret)
            return ret
        if self.restart:
            kind = "restart"
        else:
            kind = "source"
        print("Data file: %s\n\tcontains %d atoms from %d atom types\n\twith box = [%0.4g %0.4g %0.4g %0.4g %0.4g %0.4g]"
              % (self.flist[0],
                 self.headers['atoms'],
                 self.headers['atom types'],
                 self.headers['xlo xhi'][0],
                 self.headers['xlo xhi'][1],
                 self.headers['ylo yhi'][0],
                 self.headers['ylo yhi'][1],
                 self.headers['zlo zhi'][0],
                 self.headers['zlo zhi'][1],
                 ) )
        print("\twith the following sections:")
        for sectionname in self.sections.keys():
            print("\t\t" +self.dispsection(sectionname,False))
        ret = 'LAMMPS data object including %d atoms (%d types, %s="%s")' \
            % (self.headers['atoms'],self.maxtype(),kind,self.flist[0])
        return ret


    # --------------------------------------------------------------------
    # assign names to atom columns

    def map(self, *pairs):
        if len(pairs) % 2 != 0:
            raise ValueError ("data map() requires pairs of mappings")
        for i in range(0, len(pairs), 2):
            j = i + 1
            self.names[pairs[j]] = pairs[i] - 1

    # --------------------------------------------------------------------
    # extract info from data file fields

    def get(self, *list):
        if len(list) == 1:
            field = list[0]
            array = []
            lines = self.sections[field]
            for line in lines:
                words = line.split()
                values = map(float, words)
                array.append(values)
            return array
        elif len(list) == 2:
            field = list[0]
            n = list[1] - 1
            vec = []
            lines = self.sections[field]
            for line in lines:
                words = line.split()
                vec.append(float(words[n]))
            return vec
        else:
            raise ValueError("invalid arguments for data.get()")

    # --------------------------------------------------------------------
    # reorder columns in a data file field

    def reorder(self, name, *order):
        """ reorder columns: reorder("section",colidxfirst,colidxsecond,colidxthird,...)  """
        if name not in self.sections:
            raise ValueError('"%s" is not a valid section name' % name)
        n = len(order)
        print(">> reorder for %d columns" % n)
        natoms = len(self.sections[name])
        oldlines = self.sections[name]
        newlines = natoms * [""]
        for index in order:
            for i in range(len(newlines)):
                words = oldlines[i].split()
                newlines[i] += words[index - 1] + " "
        for i in range(len(newlines)):
            newlines[i] += "\n"
        self.sections[name] = newlines

    # --------------------------------------------------------------------
    # replace a column of named section with vector of values
    # the number of data is checked and repeated for scalars (added 2022-02-04)

    def replace(self, name, icol, vector):
        """ replace column values: replace("section",columnindex,vectorofvalues) with columnindex=1..ncolumns """
        lines = self.sections[name]
        nlines = len(lines)
        if name not in self.sections:
            raise ValueError('"%s" is not a valid section name' % name)
        if not isinstance(vector,list): vector = [vector]
        if len(vector)==1: vector = vector * nlines
        if len(vector) != nlines:
            raise ValueError('the length of new data (%d) in section "%s" does not match the number of rows %d' % \
                             (len(vector),name,nlines))
        newlines = []
        j = icol - 1
        for i in range(nlines):
            line = lines[i]
            words = line.split()
            words[j] = str(vector[i])
            newline = " ".join(words) + "\n"
            newlines.append(newline)
        self.sections[name] = newlines

    # --------------------------------------------------------------------
    # append a column of named section with vector of values (added 2022-02-04)

    def append(self, name, vector, forceinteger = False, propertyname=None):
        """ append a new column: X.append("section",vectorofvalues,forceinteger=False,propertyname=None) """
        if name not in self.sections:
            self.sections[name] = []
            print('Add section [%s] - file="%s"' % (name,self.title) )
        lines = self.sections[name]
        nlines = len(lines)
        if not isinstance(vector,list) and not isinstance(vector,np.ndarray):
            vector = [vector]
        if propertyname != None:
            print('\t> Add "%s" (%d values) to [%s]' % (propertyname,len(vector),name))
        else:
            print('\t> Add %d values (no name) to [%s]' % (len(vector),name))
        newlines = []
        if nlines == 0:           # empty atoms section, create first column
            nlines = len(vector)  # new column length = input column length
            for i in range(nlines):
                if forceinteger:
                    line = str(int(vector[i]))
                else:
                    line = str(vector[i])
                newlines.append(line)
        else:
            if len(vector)==1: vector = vector * nlines
            if len(vector) != nlines:
                raise ValueError('the length of new data (%d) in section "%s" does not match the number of rows %d' % \
                                 (len(vector),name,nlines))
            for i in range(nlines):
                line = lines[i]
                words = line.split()
                if forceinteger:
                    words.append(str(int(vector[i])))
                else:
                    words.append(str(vector[i]))
                newline = " ".join(words) + "\n"
                newlines.append(newline)
        self.sections[name] = newlines


    # --------------------------------------------------------------------
    # disp section info (added 2022-02-04)

    def dispsection(self, name,flaghead=True):
        """ display section info: X.dispsection("sectionname") """
        lines = self.sections[name]
        nlines = len(lines)
        line = lines[0]
        words = line.split()
        ret = '"%s": %d x %d values' % (name,nlines,len(words))

        if flaghead: ret = "LAMMPS data section "+ret
        return ret

    # --------------------------------------------------------------------
    # replace x,y,z in Atoms with x,y,z values from snapshot ntime of dump object
    # assumes id,x,y,z are defined in both data and dump files
    # also replaces ix,iy,iz if they are defined

    def newxyz(self, dm, ntime):
        nsnap = dm.findtime(ntime)
        print(">> newxyz for %d snaps" % nsnap)

        dm.sort(ntime)
        x, y, z = dm.vecs(ntime, "x", "y", "z")

        self.replace("Atoms", self.names["x"] + 1, x)
        self.replace("Atoms", self.names["y"] + 1, y)
        self.replace("Atoms", self.names["z"] + 1, z)

        if "ix" in dm.names and "ix" in self.names: #if dm.names.has_key("ix") and self.names.has_key("ix"):
            ix, iy, iz = dm.vecs(ntime, "ix", "iy", "iz")
            self.replace("Atoms", self.names["ix"] + 1, ix)
            self.replace("Atoms", self.names["iy"] + 1, iy)
            self.replace("Atoms", self.names["iz"] + 1, iz)

    # --------------------------------------------------------------------
    # delete header value or section from data file

    def delete(self, keyword):

        if keyword in self.headers: # if self.headers.has_key(keyword):
            del self.headers[keyword]
        elif keyword in self.sections: # elif self.sections.has_key(keyword):
            del self.sections[keyword]
        else:
            raise ValueError("keyword not found in data object")

    # --------------------------------------------------------------------
    # write out a LAMMPS data file

    def write(self, file):
        f = open(file, "w")
        print(self.title,file=f)
        for keyword in hkeywords:
            if keyword in self.headers: # self.headers.has_key(keyword):
                if keyword == "xlo xhi" or keyword == "ylo yhi" or keyword == "zlo zhi":
                    pair = self.headers[keyword]
                    print(pair[0], pair[1], keyword,file=f)
                elif keyword == "xy xz yz":
                    triple = self.headers[keyword]
                    print(triple[0], triple[1], triple[2], keyword,file=f)
                else:
                    print(self.headers[keyword], keyword,file=f)
        for pair in skeywords:
            keyword = pair[0]
            if keyword in self.sections: #self.sections.has_key(keyword):
                print("\n%s\n" % keyword,file=f)
                for line in self.sections[keyword]:
                    print(line,file=f,end="")
        f.close()

    # --------------------------------------------------------------------
    # iterator called from other tools

    def iterator(self, flag):
        if flag == 0:
            return 0, 0, 1
        return 0, 0, -1

    # --------------------------------------------------------------------
    # time query from other tools

    def findtime(self, n):
        if n == 0:
            return 0
        raise ValueError("no step %d exists" % n)

    # --------------------------------------------------------------------
    # return list of atoms and bonds to viz for data object

    def viz(self, isnap):
        if isnap:
            raise ValueError("cannot call data.viz() with isnap != 0")

        id = self.names["id"]
        type = self.names["type"]
        x = self.names["x"]
        y = self.names["y"]
        z = self.names["z"]

        xlohi = self.headers["xlo xhi"]
        ylohi = self.headers["ylo yhi"]
        zlohi = self.headers["zlo zhi"]
        box = [xlohi[0], ylohi[0], zlohi[0], xlohi[1], ylohi[1], zlohi[1]]

        # create atom list needed by viz from id,type,x,y,z

        atoms = []
        atomlines = self.sections["Atoms"]
        for line in atomlines:
            words = line.split()
            atoms.append(
                [
                    int(words[id]),
                    int(words[type]),
                    float(words[x]),
                    float(words[y]),
                    float(words[z]),
                ]
            )

        # create list of current bond coords from list of bonds
        # assumes atoms are sorted so can lookup up the 2 atoms in each bond

        bonds = []
        if self.sections.has_key("Bonds"):
            bondlines = self.sections["Bonds"]
            for line in bondlines:
                words = line.split()
                bid, btype = int(words[0]), int(words[1])
                atom1, atom2 = int(words[2]), int(words[3])
                atom1words = atomlines[atom1 - 1].split()
                atom2words = atomlines[atom2 - 1].split()
                bonds.append(
                    [
                        bid,
                        btype,
                        float(atom1words[x]),
                        float(atom1words[y]),
                        float(atom1words[z]),
                        float(atom2words[x]),
                        float(atom2words[y]),
                        float(atom2words[z]),
                        float(atom1words[type]),
                        float(atom2words[type]),
                    ]
                )

        tris = []
        lines = []
        return 0, box, atoms, bonds, tris, lines

    # --------------------------------------------------------------------
    # return box size

    def maxbox(self):
        xlohi = self.headers["xlo xhi"]
        ylohi = self.headers["ylo yhi"]
        zlohi = self.headers["zlo zhi"]
        return [xlohi[0], ylohi[0], zlohi[0], xlohi[1], ylohi[1], zlohi[1]]

    # --------------------------------------------------------------------
    # return number of atom types

    def maxtype(self):
        return self.headers["atom types"]


# --------------------------------------------------------------------
# data file keywords, both header and main sections

hkeywords = [
    "atoms",
    "ellipsoids",
    "lines",
    "triangles",
    "bodies",
    "bonds",
    "angles",
    "dihedrals",
    "impropers",
    "atom types",
    "bond types",
    "angle types",
    "dihedral types",
    "improper types",
    "xlo xhi",
    "ylo yhi",
    "zlo zhi",
    "xy xz yz",
]

skeywords = [
    ["Masses", "atom types"],
    ["Atoms", "atoms"],
    ["Ellipsoids", "ellipsoids"],
    ["Lines", "lines"],
    ["Triangles", "triangles"],
    ["Bodies", "bodies"],
    ["Bonds", "bonds"],
    ["Angles", "angles"],
    ["Dihedrals", "dihedrals"],
    ["Impropers", "impropers"],
    ["Velocities", "atoms"],
    ["Pair Coeffs", "atom types"],
    ["Bond Coeffs", "bond types"],
    ["Angle Coeffs", "angle types"],
    ["Dihedral Coeffs", "dihedral types"],
    ["Improper Coeffs", "improper types"],
    ["BondBond Coeffs", "angle types"],
    ["BondAngle Coeffs", "angle types"],
    ["MiddleBondTorsion Coeffs", "dihedral types"],
    ["EndBondTorsion Coeffs", "dihedral types"],
    ["AngleTorsion Coeffs", "dihedral types"],
    ["AngleAngleTorsion Coeffs", "dihedral types"],
    ["BondBond13 Coeffs", "dihedral types"],
    ["AngleAngle Coeffs", "improper types"],
    ["Molecules", "atoms"],
    ["Tinker Types", "atoms"],
]

# %%
# ===================================================
# main()
# ===================================================
# for debugging purposes (code called as a script)
# the code is called from here
# ===================================================
if __name__ == '__main__':
    datafile = "../data/play_data/data.play.lmp"
    X = data(datafile)
    Y = dump("../data/play_data/dump.play.restartme")
    t = Y.time()
    step = 2000
    R = data(Y,step)
    R.write("../tmp/data.myfirstrestart.lmp")

Classes

class data (*list)
Expand source code
class data:

    # --------------------------------------------------------------------

    def __init__(self, *list):
        self.nselect = 1

        if len(list) == 0:

        # ========================================
        # Default Constructor (empty object)
        # ========================================

            self.title = "LAMMPS data file"
            self.names = {}
            self.headers = {}
            self.sections = {}
            self.flist = []
            self.restart = False
            return

        elif isinstance(list[0],dump):

        # ========================================
        # Constructor from an existing DUMP object
        # ========================================

            X = list[0]     # supplied dump object
            t = X.time()
            nt = len(t)
            if len(list)>1:
                tselect = list[1]
                if tselect not in t:
                    raise ValueError("the input time is not available in the dump object")
            else:
                tselect = t[-1]
            itselect = next(i for i in range(nt) if t[i]==tselect)
            # set object title
            self.title = 'LAMMPS data file (restart from "%s" t = %0.5g (frame %d of %d))' %  \
                (X.flist[0],tselect,itselect,nt)
            # set default names                       ------------- HEADERS SECTION -------------
            self.names = {}
            X.tselect.one(tselect)
            snap = X.snaps[itselect]
            # set headers
            self.headers = {    'atoms': snap.natoms,
                                'atom types': X.minmax("type")[1],
                                'xlo xhi': (snap.xlo, snap.xhi),
                                'ylo yhi': (snap.ylo, snap.yhi),
                                'zlo zhi': (snap.zlo, snap.zhi)}
            # Default sections
            self.sections = {}
            # set atoms (specific to your style/kind) ------------- ATOMS SUBSECTION -------------
            template_atoms = {"smd": ["id","type","mol","c_vol", "mass", "radius",
                                      "c_contact_radius", "x", "y", "z", "f_1[1]", "f_1[2]", "f_1[3]"] }
            if X.kind(template_atoms["smd"]):
                for col in template_atoms["smd"]:
                    self.append("Atoms", X.vecs(tselect, col), col in ["id","type","mol"],col)
            else:
                raise ValueError("Please add your ATOMS section in the constructor")
            # set velocities (if required)           ------------- VELOCITIES SUBSECTION -------------
            template_velocities = {"smd": ["id","vx","vy","vz"]}
            if X.kind(template_atoms["smd"]):
                if X.kind(template_velocities["smd"]):
                    for col in template_velocities["smd"]:
                        self.append("Velocities", X.vecs(tselect, col), col == "id",col)
                else:
                    raise ValueError("the velocities are missing for the style SMD")
            # store filename
            self.flist = list[0].flist
            self.restart = True
            return

        # ===========================================================
        # Regular constructor from DATA file (supplied as filename)
        # ===========================================================

        flist = list
        file = list[0]
        if file[-3:] == ".gz":
            f = popen("%s -c %s" % (PIZZA_GUNZIP, file), "r")
        else:
            f = open(file)

        self.title = f.readline()
        self.names = {}

        headers = {}
        while 1:
            line = f.readline()
            line = line.strip()
            if len(line) == 0:
                continue
            found = 0
            for keyword in hkeywords:
                if line.find(keyword) >= 0:
                    found = 1
                    words = line.split()
                    if (
                        keyword == "xlo xhi"
                        or keyword == "ylo yhi"
                        or keyword == "zlo zhi"
                    ):
                        headers[keyword] = (float(words[0]), float(words[1]))
                    elif keyword == "xy xz yz":
                        headers[keyword] = (
                            float(words[0]),
                            float(words[1]),
                            float(words[2]),
                        )
                    else:
                        headers[keyword] = int(words[0])
            if not found:
                break

        sections = {}
        while 1:
            found = 0
            for pair in skeywords:
                keyword, length = pair[0], pair[1]
                if keyword == line:
                    found = 1
                    if length not in headers: #if not headers.has_key(length):
                        raise ValueError("data section %s has no matching header value" % line)
                    f.readline()
                    list = []
                    for i in range(headers[length]):
                        list.append(f.readline())
                    sections[keyword] = list
            if not found:
                raise ValueError("invalid section %s in data file" % line)
            f.readline()
            line = f.readline()
            if not line:
                break
            line = line.strip()

        f.close()
        self.headers = headers
        self.sections = sections
        self.flist = flist
        self.restart = False

    # --------------------------------------------------------------------
    # Display method (added OV - 2022-02-03)
    def __repr__(self):
        if self.sections == {} or self.headers == {}:
            ret = "empty %s" % self.title
            print(ret)
            return ret
        if self.restart:
            kind = "restart"
        else:
            kind = "source"
        print("Data file: %s\n\tcontains %d atoms from %d atom types\n\twith box = [%0.4g %0.4g %0.4g %0.4g %0.4g %0.4g]"
              % (self.flist[0],
                 self.headers['atoms'],
                 self.headers['atom types'],
                 self.headers['xlo xhi'][0],
                 self.headers['xlo xhi'][1],
                 self.headers['ylo yhi'][0],
                 self.headers['ylo yhi'][1],
                 self.headers['zlo zhi'][0],
                 self.headers['zlo zhi'][1],
                 ) )
        print("\twith the following sections:")
        for sectionname in self.sections.keys():
            print("\t\t" +self.dispsection(sectionname,False))
        ret = 'LAMMPS data object including %d atoms (%d types, %s="%s")' \
            % (self.headers['atoms'],self.maxtype(),kind,self.flist[0])
        return ret


    # --------------------------------------------------------------------
    # assign names to atom columns

    def map(self, *pairs):
        if len(pairs) % 2 != 0:
            raise ValueError ("data map() requires pairs of mappings")
        for i in range(0, len(pairs), 2):
            j = i + 1
            self.names[pairs[j]] = pairs[i] - 1

    # --------------------------------------------------------------------
    # extract info from data file fields

    def get(self, *list):
        if len(list) == 1:
            field = list[0]
            array = []
            lines = self.sections[field]
            for line in lines:
                words = line.split()
                values = map(float, words)
                array.append(values)
            return array
        elif len(list) == 2:
            field = list[0]
            n = list[1] - 1
            vec = []
            lines = self.sections[field]
            for line in lines:
                words = line.split()
                vec.append(float(words[n]))
            return vec
        else:
            raise ValueError("invalid arguments for data.get()")

    # --------------------------------------------------------------------
    # reorder columns in a data file field

    def reorder(self, name, *order):
        """ reorder columns: reorder("section",colidxfirst,colidxsecond,colidxthird,...)  """
        if name not in self.sections:
            raise ValueError('"%s" is not a valid section name' % name)
        n = len(order)
        print(">> reorder for %d columns" % n)
        natoms = len(self.sections[name])
        oldlines = self.sections[name]
        newlines = natoms * [""]
        for index in order:
            for i in range(len(newlines)):
                words = oldlines[i].split()
                newlines[i] += words[index - 1] + " "
        for i in range(len(newlines)):
            newlines[i] += "\n"
        self.sections[name] = newlines

    # --------------------------------------------------------------------
    # replace a column of named section with vector of values
    # the number of data is checked and repeated for scalars (added 2022-02-04)

    def replace(self, name, icol, vector):
        """ replace column values: replace("section",columnindex,vectorofvalues) with columnindex=1..ncolumns """
        lines = self.sections[name]
        nlines = len(lines)
        if name not in self.sections:
            raise ValueError('"%s" is not a valid section name' % name)
        if not isinstance(vector,list): vector = [vector]
        if len(vector)==1: vector = vector * nlines
        if len(vector) != nlines:
            raise ValueError('the length of new data (%d) in section "%s" does not match the number of rows %d' % \
                             (len(vector),name,nlines))
        newlines = []
        j = icol - 1
        for i in range(nlines):
            line = lines[i]
            words = line.split()
            words[j] = str(vector[i])
            newline = " ".join(words) + "\n"
            newlines.append(newline)
        self.sections[name] = newlines

    # --------------------------------------------------------------------
    # append a column of named section with vector of values (added 2022-02-04)

    def append(self, name, vector, forceinteger = False, propertyname=None):
        """ append a new column: X.append("section",vectorofvalues,forceinteger=False,propertyname=None) """
        if name not in self.sections:
            self.sections[name] = []
            print('Add section [%s] - file="%s"' % (name,self.title) )
        lines = self.sections[name]
        nlines = len(lines)
        if not isinstance(vector,list) and not isinstance(vector,np.ndarray):
            vector = [vector]
        if propertyname != None:
            print('\t> Add "%s" (%d values) to [%s]' % (propertyname,len(vector),name))
        else:
            print('\t> Add %d values (no name) to [%s]' % (len(vector),name))
        newlines = []
        if nlines == 0:           # empty atoms section, create first column
            nlines = len(vector)  # new column length = input column length
            for i in range(nlines):
                if forceinteger:
                    line = str(int(vector[i]))
                else:
                    line = str(vector[i])
                newlines.append(line)
        else:
            if len(vector)==1: vector = vector * nlines
            if len(vector) != nlines:
                raise ValueError('the length of new data (%d) in section "%s" does not match the number of rows %d' % \
                                 (len(vector),name,nlines))
            for i in range(nlines):
                line = lines[i]
                words = line.split()
                if forceinteger:
                    words.append(str(int(vector[i])))
                else:
                    words.append(str(vector[i]))
                newline = " ".join(words) + "\n"
                newlines.append(newline)
        self.sections[name] = newlines


    # --------------------------------------------------------------------
    # disp section info (added 2022-02-04)

    def dispsection(self, name,flaghead=True):
        """ display section info: X.dispsection("sectionname") """
        lines = self.sections[name]
        nlines = len(lines)
        line = lines[0]
        words = line.split()
        ret = '"%s": %d x %d values' % (name,nlines,len(words))

        if flaghead: ret = "LAMMPS data section "+ret
        return ret

    # --------------------------------------------------------------------
    # replace x,y,z in Atoms with x,y,z values from snapshot ntime of dump object
    # assumes id,x,y,z are defined in both data and dump files
    # also replaces ix,iy,iz if they are defined

    def newxyz(self, dm, ntime):
        nsnap = dm.findtime(ntime)
        print(">> newxyz for %d snaps" % nsnap)

        dm.sort(ntime)
        x, y, z = dm.vecs(ntime, "x", "y", "z")

        self.replace("Atoms", self.names["x"] + 1, x)
        self.replace("Atoms", self.names["y"] + 1, y)
        self.replace("Atoms", self.names["z"] + 1, z)

        if "ix" in dm.names and "ix" in self.names: #if dm.names.has_key("ix") and self.names.has_key("ix"):
            ix, iy, iz = dm.vecs(ntime, "ix", "iy", "iz")
            self.replace("Atoms", self.names["ix"] + 1, ix)
            self.replace("Atoms", self.names["iy"] + 1, iy)
            self.replace("Atoms", self.names["iz"] + 1, iz)

    # --------------------------------------------------------------------
    # delete header value or section from data file

    def delete(self, keyword):

        if keyword in self.headers: # if self.headers.has_key(keyword):
            del self.headers[keyword]
        elif keyword in self.sections: # elif self.sections.has_key(keyword):
            del self.sections[keyword]
        else:
            raise ValueError("keyword not found in data object")

    # --------------------------------------------------------------------
    # write out a LAMMPS data file

    def write(self, file):
        f = open(file, "w")
        print(self.title,file=f)
        for keyword in hkeywords:
            if keyword in self.headers: # self.headers.has_key(keyword):
                if keyword == "xlo xhi" or keyword == "ylo yhi" or keyword == "zlo zhi":
                    pair = self.headers[keyword]
                    print(pair[0], pair[1], keyword,file=f)
                elif keyword == "xy xz yz":
                    triple = self.headers[keyword]
                    print(triple[0], triple[1], triple[2], keyword,file=f)
                else:
                    print(self.headers[keyword], keyword,file=f)
        for pair in skeywords:
            keyword = pair[0]
            if keyword in self.sections: #self.sections.has_key(keyword):
                print("\n%s\n" % keyword,file=f)
                for line in self.sections[keyword]:
                    print(line,file=f,end="")
        f.close()

    # --------------------------------------------------------------------
    # iterator called from other tools

    def iterator(self, flag):
        if flag == 0:
            return 0, 0, 1
        return 0, 0, -1

    # --------------------------------------------------------------------
    # time query from other tools

    def findtime(self, n):
        if n == 0:
            return 0
        raise ValueError("no step %d exists" % n)

    # --------------------------------------------------------------------
    # return list of atoms and bonds to viz for data object

    def viz(self, isnap):
        if isnap:
            raise ValueError("cannot call data.viz() with isnap != 0")

        id = self.names["id"]
        type = self.names["type"]
        x = self.names["x"]
        y = self.names["y"]
        z = self.names["z"]

        xlohi = self.headers["xlo xhi"]
        ylohi = self.headers["ylo yhi"]
        zlohi = self.headers["zlo zhi"]
        box = [xlohi[0], ylohi[0], zlohi[0], xlohi[1], ylohi[1], zlohi[1]]

        # create atom list needed by viz from id,type,x,y,z

        atoms = []
        atomlines = self.sections["Atoms"]
        for line in atomlines:
            words = line.split()
            atoms.append(
                [
                    int(words[id]),
                    int(words[type]),
                    float(words[x]),
                    float(words[y]),
                    float(words[z]),
                ]
            )

        # create list of current bond coords from list of bonds
        # assumes atoms are sorted so can lookup up the 2 atoms in each bond

        bonds = []
        if self.sections.has_key("Bonds"):
            bondlines = self.sections["Bonds"]
            for line in bondlines:
                words = line.split()
                bid, btype = int(words[0]), int(words[1])
                atom1, atom2 = int(words[2]), int(words[3])
                atom1words = atomlines[atom1 - 1].split()
                atom2words = atomlines[atom2 - 1].split()
                bonds.append(
                    [
                        bid,
                        btype,
                        float(atom1words[x]),
                        float(atom1words[y]),
                        float(atom1words[z]),
                        float(atom2words[x]),
                        float(atom2words[y]),
                        float(atom2words[z]),
                        float(atom1words[type]),
                        float(atom2words[type]),
                    ]
                )

        tris = []
        lines = []
        return 0, box, atoms, bonds, tris, lines

    # --------------------------------------------------------------------
    # return box size

    def maxbox(self):
        xlohi = self.headers["xlo xhi"]
        ylohi = self.headers["ylo yhi"]
        zlohi = self.headers["zlo zhi"]
        return [xlohi[0], ylohi[0], zlohi[0], xlohi[1], ylohi[1], zlohi[1]]

    # --------------------------------------------------------------------
    # return number of atom types

    def maxtype(self):
        return self.headers["atom types"]

Methods

def append(self, name, vector, forceinteger=False, propertyname=None)

append a new column: X.append("section",vectorofvalues,forceinteger=False,propertyname=None)

Expand source code
def append(self, name, vector, forceinteger = False, propertyname=None):
    """ append a new column: X.append("section",vectorofvalues,forceinteger=False,propertyname=None) """
    if name not in self.sections:
        self.sections[name] = []
        print('Add section [%s] - file="%s"' % (name,self.title) )
    lines = self.sections[name]
    nlines = len(lines)
    if not isinstance(vector,list) and not isinstance(vector,np.ndarray):
        vector = [vector]
    if propertyname != None:
        print('\t> Add "%s" (%d values) to [%s]' % (propertyname,len(vector),name))
    else:
        print('\t> Add %d values (no name) to [%s]' % (len(vector),name))
    newlines = []
    if nlines == 0:           # empty atoms section, create first column
        nlines = len(vector)  # new column length = input column length
        for i in range(nlines):
            if forceinteger:
                line = str(int(vector[i]))
            else:
                line = str(vector[i])
            newlines.append(line)
    else:
        if len(vector)==1: vector = vector * nlines
        if len(vector) != nlines:
            raise ValueError('the length of new data (%d) in section "%s" does not match the number of rows %d' % \
                             (len(vector),name,nlines))
        for i in range(nlines):
            line = lines[i]
            words = line.split()
            if forceinteger:
                words.append(str(int(vector[i])))
            else:
                words.append(str(vector[i]))
            newline = " ".join(words) + "\n"
            newlines.append(newline)
    self.sections[name] = newlines
def delete(self, keyword)
Expand source code
def delete(self, keyword):

    if keyword in self.headers: # if self.headers.has_key(keyword):
        del self.headers[keyword]
    elif keyword in self.sections: # elif self.sections.has_key(keyword):
        del self.sections[keyword]
    else:
        raise ValueError("keyword not found in data object")
def dispsection(self, name, flaghead=True)

display section info: X.dispsection("sectionname")

Expand source code
def dispsection(self, name,flaghead=True):
    """ display section info: X.dispsection("sectionname") """
    lines = self.sections[name]
    nlines = len(lines)
    line = lines[0]
    words = line.split()
    ret = '"%s": %d x %d values' % (name,nlines,len(words))

    if flaghead: ret = "LAMMPS data section "+ret
    return ret
def findtime(self, n)
Expand source code
def findtime(self, n):
    if n == 0:
        return 0
    raise ValueError("no step %d exists" % n)
def get(self, *list)
Expand source code
def get(self, *list):
    if len(list) == 1:
        field = list[0]
        array = []
        lines = self.sections[field]
        for line in lines:
            words = line.split()
            values = map(float, words)
            array.append(values)
        return array
    elif len(list) == 2:
        field = list[0]
        n = list[1] - 1
        vec = []
        lines = self.sections[field]
        for line in lines:
            words = line.split()
            vec.append(float(words[n]))
        return vec
    else:
        raise ValueError("invalid arguments for data.get()")
def iterator(self, flag)
Expand source code
def iterator(self, flag):
    if flag == 0:
        return 0, 0, 1
    return 0, 0, -1
def map(self, *pairs)
Expand source code
def map(self, *pairs):
    if len(pairs) % 2 != 0:
        raise ValueError ("data map() requires pairs of mappings")
    for i in range(0, len(pairs), 2):
        j = i + 1
        self.names[pairs[j]] = pairs[i] - 1
def maxbox(self)
Expand source code
def maxbox(self):
    xlohi = self.headers["xlo xhi"]
    ylohi = self.headers["ylo yhi"]
    zlohi = self.headers["zlo zhi"]
    return [xlohi[0], ylohi[0], zlohi[0], xlohi[1], ylohi[1], zlohi[1]]
def maxtype(self)
Expand source code
def maxtype(self):
    return self.headers["atom types"]
def newxyz(self, dm, ntime)
Expand source code
def newxyz(self, dm, ntime):
    nsnap = dm.findtime(ntime)
    print(">> newxyz for %d snaps" % nsnap)

    dm.sort(ntime)
    x, y, z = dm.vecs(ntime, "x", "y", "z")

    self.replace("Atoms", self.names["x"] + 1, x)
    self.replace("Atoms", self.names["y"] + 1, y)
    self.replace("Atoms", self.names["z"] + 1, z)

    if "ix" in dm.names and "ix" in self.names: #if dm.names.has_key("ix") and self.names.has_key("ix"):
        ix, iy, iz = dm.vecs(ntime, "ix", "iy", "iz")
        self.replace("Atoms", self.names["ix"] + 1, ix)
        self.replace("Atoms", self.names["iy"] + 1, iy)
        self.replace("Atoms", self.names["iz"] + 1, iz)
def reorder(self, name, *order)

reorder columns: reorder("section",colidxfirst,colidxsecond,colidxthird,…)

Expand source code
def reorder(self, name, *order):
    """ reorder columns: reorder("section",colidxfirst,colidxsecond,colidxthird,...)  """
    if name not in self.sections:
        raise ValueError('"%s" is not a valid section name' % name)
    n = len(order)
    print(">> reorder for %d columns" % n)
    natoms = len(self.sections[name])
    oldlines = self.sections[name]
    newlines = natoms * [""]
    for index in order:
        for i in range(len(newlines)):
            words = oldlines[i].split()
            newlines[i] += words[index - 1] + " "
    for i in range(len(newlines)):
        newlines[i] += "\n"
    self.sections[name] = newlines
def replace(self, name, icol, vector)

replace column values: replace("section",columnindex,vectorofvalues) with columnindex=1..ncolumns

Expand source code
def replace(self, name, icol, vector):
    """ replace column values: replace("section",columnindex,vectorofvalues) with columnindex=1..ncolumns """
    lines = self.sections[name]
    nlines = len(lines)
    if name not in self.sections:
        raise ValueError('"%s" is not a valid section name' % name)
    if not isinstance(vector,list): vector = [vector]
    if len(vector)==1: vector = vector * nlines
    if len(vector) != nlines:
        raise ValueError('the length of new data (%d) in section "%s" does not match the number of rows %d' % \
                         (len(vector),name,nlines))
    newlines = []
    j = icol - 1
    for i in range(nlines):
        line = lines[i]
        words = line.split()
        words[j] = str(vector[i])
        newline = " ".join(words) + "\n"
        newlines.append(newline)
    self.sections[name] = newlines
def viz(self, isnap)
Expand source code
def viz(self, isnap):
    if isnap:
        raise ValueError("cannot call data.viz() with isnap != 0")

    id = self.names["id"]
    type = self.names["type"]
    x = self.names["x"]
    y = self.names["y"]
    z = self.names["z"]

    xlohi = self.headers["xlo xhi"]
    ylohi = self.headers["ylo yhi"]
    zlohi = self.headers["zlo zhi"]
    box = [xlohi[0], ylohi[0], zlohi[0], xlohi[1], ylohi[1], zlohi[1]]

    # create atom list needed by viz from id,type,x,y,z

    atoms = []
    atomlines = self.sections["Atoms"]
    for line in atomlines:
        words = line.split()
        atoms.append(
            [
                int(words[id]),
                int(words[type]),
                float(words[x]),
                float(words[y]),
                float(words[z]),
            ]
        )

    # create list of current bond coords from list of bonds
    # assumes atoms are sorted so can lookup up the 2 atoms in each bond

    bonds = []
    if self.sections.has_key("Bonds"):
        bondlines = self.sections["Bonds"]
        for line in bondlines:
            words = line.split()
            bid, btype = int(words[0]), int(words[1])
            atom1, atom2 = int(words[2]), int(words[3])
            atom1words = atomlines[atom1 - 1].split()
            atom2words = atomlines[atom2 - 1].split()
            bonds.append(
                [
                    bid,
                    btype,
                    float(atom1words[x]),
                    float(atom1words[y]),
                    float(atom1words[z]),
                    float(atom2words[x]),
                    float(atom2words[y]),
                    float(atom2words[z]),
                    float(atom1words[type]),
                    float(atom2words[type]),
                ]
            )

    tris = []
    lines = []
    return 0, box, atoms, bonds, tris, lines
def write(self, file)
Expand source code
def write(self, file):
    f = open(file, "w")
    print(self.title,file=f)
    for keyword in hkeywords:
        if keyword in self.headers: # self.headers.has_key(keyword):
            if keyword == "xlo xhi" or keyword == "ylo yhi" or keyword == "zlo zhi":
                pair = self.headers[keyword]
                print(pair[0], pair[1], keyword,file=f)
            elif keyword == "xy xz yz":
                triple = self.headers[keyword]
                print(triple[0], triple[1], triple[2], keyword,file=f)
            else:
                print(self.headers[keyword], keyword,file=f)
    for pair in skeywords:
        keyword = pair[0]
        if keyword in self.sections: #self.sections.has_key(keyword):
            print("\n%s\n" % keyword,file=f)
            for line in self.sections[keyword]:
                print(line,file=f,end="")
    f.close()
class dump (*file_list: str, read_files: bool = True)

The dump class provides comprehensive tools for reading, writing, and manipulating LAMMPS dump files and particle attributes. It handles both static and dynamic properties of snapshots with robust methods for data selection, transformation, and visualization.

Initialize a dump object.

Parameters

*file_list (str): Variable length argument list of dump file paths. Can include wildcards. read_files (bool): If False, store filenames without reading. Default is True.

Expand source code
class dump:
    """
    The `dump` class provides comprehensive tools for reading, writing, and manipulating LAMMPS dump files and particle attributes. It handles both static and dynamic properties of snapshots with robust methods for data selection, transformation, and visualization.
    """

    def __init__(self, *file_list: str, read_files: bool = True):
        """
        Initialize a dump object.

        Parameters:
            *file_list (str): Variable length argument list of dump file paths. Can include wildcards.
            read_files (bool): If False, store filenames without reading. Default is True.
        """
        self.snaps: List[Snap] = []
        self.nsnaps: int = 0
        self.nselect: int = 0
        self.names: Dict[str, int] = {}
        self.tselect = tselect(self)
        self.aselect = aselect(self)
        self.atype: str = "type"
        self.bondflag: int = 0
        self.bondlist: List[List[int]] = []
        self.triflag: int = 0
        self.trilist: List[List[float]] = []
        self.lineflag: int = 0
        self.linelist: List[List[float]] = []
        self.objextra: Optional[Any] = None

        # flist = list of all dump file names
        raw_filenames = ' '.join(file_list)
        self.flist: List[str] = glob.glob(raw_filenames) if read_files else list(file_list)

        if not self.flist and read_files:
            logger.error("No dump file specified.")
            raise ValueError("No dump file specified.")

        if read_files:
            self.increment: int = 0
            self.read_all()
        else:
            self.increment = 1
            self.nextfile = 0
            self.eof = 0

    def __repr__(self) -> str:
        """
        Return a string representation of the dump object.

        Returns:
            str: Description of the dump object.
        """
        times = self.time()
        ntimes = len(times)
        lastime = times[-1] if ntimes > 0 else 0
        fields = self.names

        field_names = ", ".join(sorted(fields.keys(), key=lambda k: fields[k]))
        representation = (f'Dump object from file "{self.flist[0]}" '
                          f'with {ntimes} frames (last timestep={lastime}) '
                          f'and fields: {field_names}')
        logger.info(representation)
        return representation

    def read_all(self) -> None:
        """
        Read all snapshots from each file in the file list.
        """
        for file in self.flist:
            is_gzipped = file.endswith(".gz")
            try:
                if is_gzipped:
                    with subprocess.Popen([PIZZA_GUNZIP, "-c", file],
                                          stdout=subprocess.PIPE,
                                          text=True) as proc:
                        file_handle = proc.stdout
                        logger.debug(f"Opened gzipped file: {file}")
                else:
                    file_handle = open(file, 'r')
                    logger.debug(f"Opened file: {file}")

                with file_handle:
                    snap = self.read_snapshot(file_handle)
                    while snap:
                        self.snaps.append(snap)
                        logger.info(f"Read snapshot at time {snap.time}")
                        snap = self.read_snapshot(file_handle)
            except subprocess.CalledProcessError as e:
                logger.error(f"Error decompressing file '{file}': {e}")
                raise
            except FileNotFoundError:
                logger.error(f"File '{file}' not found.")
                raise
            except Exception as e:
                logger.error(f"Error reading file '{file}': {e}")
                raise

        self.snaps.sort()
        self.cull()
        self.nsnaps = len(self.snaps)
        logger.info(f"Read {self.nsnaps} snapshots.")

        # Select all timesteps and atoms by default
        self.tselect.all()

        # Log column assignments
        if self.names:
            logger.info(f"Assigned columns: {', '.join(sorted(self.names.keys(), key=lambda k: self.names[k]))}")
        else:
            logger.warning("No column assignments made.")

        # Unscale if necessary
        if self.nsnaps > 0:
            if getattr(self, 'scale_original', -1) == 1:
                self.unscale()
            elif getattr(self, 'scale_original', -1) == 0:
                logger.info("Dump is already unscaled.")
            else:
                logger.warning("Dump scaling status is unknown.")

    def read_snapshot(self, f) -> Optional['Snap']:
        """
        Read a single snapshot from a file.

        Parameters:
            f (file object): File handle to read from.

        Returns:
            Optional[Snap]: Snapshot object or None if failed.
        """
        try:
            snap = Snap()

            # Read and assign ITEMS
            while True:
                item = f.readline()
                if not item:
                    break
                if not item.startswith("ITEM:"):
                    continue
                item_type = item.split("ITEM:")[1].strip()
                if item_type == "TIME":
                    snap.realtime = float(f.readline().strip())
                elif item_type == "TIMESTEP":
                    snap.time = int(f.readline().strip())
                elif item_type == "NUMBER OF ATOMS":
                    snap.natoms = int(f.readline().strip())
                elif item_type.startswith("BOX BOUNDS"):
                    snap.boxstr = item_type.split("BOX BOUNDS")[1].strip()
                    box_bounds = []
                    for _ in range(3):
                        bounds = f.readline().strip().split()
                        box_bounds.append(tuple(map(float, bounds[:2])))
                        if len(bounds) > 2:
                            setattr(snap, bounds[2], float(bounds[2]))
                        else:
                            setattr(snap, bounds[2] if len(bounds) > 2 else 'xy', 0.0)
                    snap.xlo, snap.xhi = box_bounds[0]
                    snap.ylo, snap.yhi = box_bounds[1]
                    snap.zlo, snap.zhi = box_bounds[2]
                    snap.triclinic = 1 if len(box_bounds[0]) > 2 else 0
                elif item_type == "ATOMS":
                    if not self.names:
                        self.assign_column_names(f.readline())
                    snap.aselect = np.ones(snap.natoms, dtype=bool)
                    atoms = []
                    for _ in range(snap.natoms):
                        line = f.readline()
                        if not line:
                            break
                        atoms.append(list(map(float, line.strip().split())))
                    snap.atoms = np.array(atoms)
                    break

            if not hasattr(snap, 'time'):
                return None

            return snap
        except Exception as e:
            logger.error(f"Error reading snapshot: {e}")
            return None

    def assign_column_names(self, line: str) -> None:
        """
        Assign column names based on the ATOMS section header.

        Parameters:
            line (str): The header line containing column names.
        """
        try:
            columns = line.strip().split()[1:]  # Skip the first word (e.g., "id")
            for idx, col in enumerate(columns):
                self.names[col] = idx
            logger.debug(f"Assigned column names: {self.names}")
            # Determine scaling status based on column names
            x_scaled = "xs" in self.names
            y_scaled = "ys" in self.names
            z_scaled = "zs" in self.names
            self.scale_original = 1 if x_scaled and y_scaled and z_scaled else 0
            logger.info(f"Coordinate scaling status: {'scaled' if self.scale_original else 'unscaled'}")
        except Exception as e:
            logger.error(f"Error assigning column names: {e}")
            raise

    def __add__(self, other: 'dump') -> 'dump':
        """
        Merge two dump objects of the same type.

        Parameters:
            other (dump): Another dump object to merge with.

        Returns:
            dump: A new dump object containing snapshots from both dumps.

        Raises:
            ValueError: If the dump types do not match or other is not a dump instance.
        """
        if not isinstance(other, dump):
            raise ValueError("The second operand is not a dump object.")
        if self.type != other.type:
            raise ValueError("The dumps are not of the same type.")
        combined_files = self.flist + other.flist
        new_dump = dump(*combined_files)
        return new_dump

    def cull(self) -> None:
        """
        Remove duplicate snapshots based on timestep.
        """
        unique_snaps = {}
        culled_snaps = []
        for snap in self.snaps:
            if snap.time not in unique_snaps:
                unique_snaps[snap.time] = snap
                culled_snaps.append(snap)
            else:
                logger.warning(f"Duplicate timestep {snap.time} found. Culling duplicate.")
        self.snaps = culled_snaps
        logger.info(f"Culled duplicates. Total snapshots: {len(self.snaps)}")

    def sort(self, key: Union[str, int] = "id") -> None:
        """
        Sort atoms or snapshots.

        Parameters:
            key (Union[str, int]): The key to sort by. If str, sorts snapshots by that column. If int, sorts atoms in a specific timestep.
        """
        if isinstance(key, str):
            if key not in self.names:
                raise ValueError(f"Column '{key}' not found for sorting.")
            logger.info(f"Sorting snapshots by column '{key}'.")
            icol = self.names[key]
            for snap in self.snaps:
                if not snap.tselect:
                    continue
                snap.atoms = snap.atoms[snap.atoms[:, icol].argsort()]
        elif isinstance(key, int):
            try:
                snap = self.snaps[self.findtime(key)]
                logger.info(f"Sorting atoms in snapshot at timestep {key}.")
                if "id" in self.names:
                    id_col = self.names["id"]
                    snap.atoms = snap.atoms[snap.atoms[:, id_col].argsort()]
                else:
                    logger.warning("No 'id' column found for sorting atoms.")
            except ValueError as e:
                logger.error(e)
                raise
        else:
            logger.error("Invalid key type for sort().")
            raise TypeError("Key must be a string or integer.")

    def write(self, filename: str, head: int = 1, app: int = 0) -> None:
        """
        Write the dump object to a LAMMPS dump file.

        Parameters:
            filename (str): The output file path.
            head (int): Whether to include the snapshot header (1 for yes, 0 for no).
            app (int): Whether to append to the file (1 for yes, 0 for no).
        """
        try:
            mode = "a" if app else "w"
            with open(filename, mode) as f:
                for snap in self.snaps:
                    if not snap.tselect:
                        continue
                    if head:
                        f.write("ITEM: TIMESTEP\n")
                        f.write(f"{snap.time}\n")
                        f.write("ITEM: NUMBER OF ATOMS\n")
                        f.write(f"{snap.nselect}\n")
                        f.write("ITEM: BOX BOUNDS xy xz yz\n" if snap.triclinic else "ITEM: BOX BOUNDS pp pp pp\n")
                        f.write(f"{snap.xlo} {snap.xhi} {getattr(snap, 'xy', 0.0)}\n")
                        f.write(f"{snap.ylo} {snap.yhi} {getattr(snap, 'xz', 0.0)}\n")
                        f.write(f"{snap.zlo} {snap.zhi} {getattr(snap, 'yz', 0.0)}\n")
                        f.write(f"ITEM: ATOMS {' '.join(sorted(self.names.keys(), key=lambda k: self.names[k]))}\n")
                    for atom in snap.atoms[snap.aselect]:
                        atom_str = " ".join([f"{int(atom[self.names['id']])}" if key in ["id", "type"] else f"{atom[self.names[key]]}" 
                                             for key in sorted(self.names.keys(), key=lambda k: self.names[k])])
                        f.write(f"{atom_str}\n")
            logger.info(f"Dump object written to '{filename}'.")
        except IOError as e:
            logger.error(f"Error writing to file '{filename}': {e}")
            raise

    def scatter(self, root: str) -> None:
        """
        Write each selected snapshot to a separate dump file with timestep suffix.

        Parameters:
            root (str): The root name for output files. Suffix will be added based on timestep.
        """
        try:
            for snap in self.snaps:
                if not snap.tselect:
                    continue
                filename = f"{root}.{snap.time}"
                with open(filename, "w") as f:
                    f.write("ITEM: TIMESTEP\n")
                    f.write(f"{snap.time}\n")
                    f.write("ITEM: NUMBER OF ATOMS\n")
                    f.write(f"{snap.nselect}\n")
                    f.write("ITEM: BOX BOUNDS xy xz yz\n" if snap.triclinic else "ITEM: BOX BOUNDS pp pp pp\n")
                    f.write(f"{snap.xlo} {snap.xhi} {getattr(snap, 'xy', 0.0)}\n")
                    f.write(f"{snap.ylo} {snap.yhi} {getattr(snap, 'xz', 0.0)}\n")
                    f.write(f"{snap.zlo} {snap.zhi} {getattr(snap, 'yz', 0.0)}\n")
                    f.write(f"ITEM: ATOMS {' '.join(sorted(self.names.keys(), key=lambda k: self.names[k]))}\n")
                    for atom in snap.atoms[snap.aselect]:
                        atom_str = " ".join([f"{int(atom[self.names['id']])}" if key in ["id", "type"] else f"{atom[self.names[key]]}" 
                                             for key in sorted(self.names.keys(), key=lambda k: self.names[k])])
                        f.write(f"{atom_str}\n")
            logger.info(f"Scatter write completed with root '{root}'.")
        except IOError as e:
            logger.error(f"Error writing scatter files: {e}")
            raise

    def minmax(self, colname: str) -> Tuple[float, float]:
        """
        Find the minimum and maximum values for a specified column across all selected snapshots and atoms.

        Parameters:
            colname (str): The column name to find min and max for.

        Returns:
            Tuple[float, float]: The minimum and maximum values.

        Raises:
            KeyError: If the column name does not exist.
        """
        if colname not in self.names:
            raise KeyError(f"Column '{colname}' not found.")
        icol = self.names[colname]
        min_val = np.inf
        max_val = -np.inf
        for snap in self.snaps:
            if not snap.tselect:
                continue
            selected_atoms = snap.atoms[snap.aselect]
            if selected_atoms.size == 0:
                continue
            current_min = selected_atoms[:, icol].min()
            current_max = selected_atoms[:, icol].max()
            if current_min < min_val:
                min_val = current_min
            if current_max > max_val:
                max_val = current_max
        logger.info(f"minmax for column '{colname}': min={min_val}, max={max_val}")
        return min_val, max_val

    def set(self, eq: str) -> None:
        """
        Set a column value using an equation for all selected snapshots and atoms.

        Parameters:
            eq (str): The equation to compute the new column values. Use $<column_name> for variables.

        Example:
            d.set("$ke = $vx * $vx + $vy * $vy")
        """
        logger.info(f"Setting column using equation: {eq}")
        pattern = r"\$\w+"
        variables = re.findall(pattern, eq)
        if not variables:
            logger.warning("No variables found in equation.")
            return
        lhs = variables[0][1:]
        if lhs not in self.names:
            self.newcolumn(lhs)
        try:
            # Replace $var with appropriate array accesses
            for var in variables:
                var_name = var[1:]
                if var_name not in self.names:
                    raise KeyError(f"Variable '{var_name}' not found in columns.")
                col_index = self.names[var_name]
                eq = eq.replace(var, f"snap.atoms[i][{col_index}]")
            compiled_eq = compile(eq, "<string>", "exec")
            for snap in self.snaps:
                if not snap.tselect:
                    continue
                for i in range(snap.natoms):
                    if not snap.aselect[i]:
                        continue
                    exec(compiled_eq)
            logger.info("Column values set successfully.")
        except Exception as e:
            logger.error(f"Error setting column values: {e}")
            raise

    def setv(self, colname: str, vector: List[float]) -> None:
        """
        Set a column value using a vector of values for all selected snapshots and atoms.

        Parameters:
            colname (str): The column name to set.
            vector (List[float]): The values to assign to the column.

        Raises:
            KeyError: If the column name does not exist.
            ValueError: If the length of the vector does not match the number of selected atoms.
        """
        logger.info(f"Setting column '{colname}' using a vector of values.")
        if colname not in self.names:
            self.newcolumn(colname)
        icol = self.names[colname]
        for snap in self.snaps:
            if not snap.tselect:
                continue
            if len(vector) != snap.nselect:
                raise ValueError("Vector length does not match the number of selected atoms.")
            selected_indices = np.where(snap.aselect)[0]
            snap.atoms[selected_indices, icol] = vector
        logger.info(f"Column '{colname}' set successfully.")

    def spread(self, old: str, n: int, new: str) -> None:
        """
        Spread values from an old column into a new column as integers from 1 to n based on their relative positions.

        Parameters:
            old (str): The column name to spread.
            n (int): The number of spread values.
            new (str): The new column name to create.

        Raises:
            KeyError: If the old column does not exist.
        """
        logger.info(f"Spreading column '{old}' into new column '{new}' with {n} spread values.")
        if old not in self.names:
            raise KeyError(f"Column '{old}' not found.")
        if new not in self.names:
            self.newcolumn(new)
        iold = self.names[old]
        inew = self.names[new]
        min_val, max_val = self.minmax(old)
        gap = max_val - min_val
        if gap == 0:
            gap = 1.0  # Prevent division by zero
        invdelta = n / gap
        for snap in self.snaps:
            if not snap.tselect:
                continue
            selected_atoms = snap.atoms[snap.aselect]
            snap.atoms[snap.aselect, inew] = np.clip(((selected_atoms[:, iold] - min_val) * invdelta).astype(int) + 1, 1, n)
        logger.info(f"Column '{new}' spread successfully.")

    def clone(self, nstep: int, col: str) -> None:
        """
        Clone the value from a specific timestep's column to all selected snapshots for atoms with the same ID.

        Parameters:
            nstep (int): The timestep to clone from.
            col (str): The column name to clone.

        Raises:
            KeyError: If the column or ID column does not exist.
            ValueError: If the specified timestep does not exist.
        """
        logger.info(f"Cloning column '{col}' from timestep {nstep} to all selected snapshots.")
        if "id" not in self.names:
            raise KeyError("Column 'id' not found.")
        if col not in self.names:
            raise KeyError(f"Column '{col}' not found.")
        istep = self.findtime(nstep)
        icol = self.names[col]
        id_col = self.names["id"]
        id_to_index = {atom[id_col]: idx for idx, atom in enumerate(self.snaps[istep].atoms)}
        for snap in self.snaps:
            if not snap.tselect:
                continue
            for i, atom in enumerate(snap.atoms):
                if not snap.aselect[i]:
                    continue
                atom_id = atom[id_col]
                if atom_id in id_to_index:
                    snap.atoms[i, icol] = self.snaps[istep].atoms[id_to_index[atom_id], icol]
        logger.info("Cloning completed successfully.")

    def time(self) -> List[int]:
        """
        Return a list of selected snapshot timesteps.

        Returns:
            List[int]: List of timestep values.
        """
        times = [snap.time for snap in self.snaps if snap.tselect]
        logger.debug(f"Selected timesteps: {times}")
        return times

    def realtime(self) -> List[float]:
        """
        Return a list of selected snapshot real-time values.

        Returns:
            List[float]: List of real-time values.
        """
        times = [snap.realtime for snap in self.snaps if snap.tselect and hasattr(snap, 'realtime')]
        logger.debug(f"Selected real-time values: {times}")
        return times

    def atom(self, n: int, *columns: str) -> Union[List[float], List[List[float]]]:
        """
        Extract values for a specific atom ID across all selected snapshots.

        Parameters:
            n (int): The atom ID to extract.
            *columns (str): The column names to extract.

        Returns:
            Union[List[float], List[List[float]]]: The extracted values.

        Raises:
            KeyError: If any specified column does not exist.
            ValueError: If the atom ID is not found in any snapshot.
        """
        logger.info(f"Extracting atom ID {n} values for columns {columns}.")
        if not columns:
            raise ValueError("No columns specified for extraction.")
        column_indices = []
        for col in columns:
            if col not in self.names:
                raise KeyError(f"Column '{col}' not found.")
            column_indices.append(self.names[col])

        extracted = [[] for _ in columns]
        for snap in self.snaps:
            if not snap.tselect:
                continue
            atom_rows = snap.atoms[snap.aselect]
            id_column = self.names["id"]
            matching_atoms = atom_rows[atom_rows[:, id_column] == n]
            if matching_atoms.size == 0:
                raise ValueError(f"Atom ID {n} not found in snapshot at timestep {snap.time}.")
            atom = matching_atoms[0]
            for idx, col_idx in enumerate(column_indices):
                extracted[idx].append(atom[col_idx])
        if len(columns) == 1:
            return extracted[0]
        return extracted

    def vecs(self, n: int, *columns: str) -> Union[List[float], List[List[float]]]:
        """
        Extract values for selected atoms at a specific timestep.

        Parameters:
            n (int): The timestep to extract from.
            *columns (str): The column names to extract.

        Returns:
            Union[List[float], List[List[float]]]: The extracted values.

        Raises:
            KeyError: If any specified column does not exist.
            ValueError: If the specified timestep does not exist.
        """
        logger.info(f"Extracting columns {columns} for timestep {n}.")
        if not columns:
            raise ValueError("No columns specified for extraction.")
        try:
            snap = self.snaps[self.findtime(n)]
        except ValueError as e:
            logger.error(e)
            raise
        column_indices = []
        for col in columns:
            if col not in self.names:
                raise KeyError(f"Column '{col}' not found.")
            column_indices.append(self.names[col])
        extracted = [[] for _ in columns]
        selected_atoms = snap.atoms[snap.aselect]
        for atom in selected_atoms:
            for idx, col_idx in enumerate(column_indices):
                extracted[idx].append(atom[col_idx])
        if len(columns) == 1:
            return extracted[0]
        return extracted

    def newcolumn(self, colname: str) -> None:
        """
        Add a new column to every snapshot and initialize it to zero.

        Parameters:
            colname (str): The name of the new column.
        """
        logger.info(f"Adding new column '{colname}' with default value 0.")
        if colname in self.names:
            logger.warning(f"Column '{colname}' already exists.")
            return
        new_col_index = len(self.names)
        self.names[colname] = new_col_index
        for snap in self.snaps:
            if snap.atoms is not None:
                new_column = np.zeros((snap.atoms.shape[0], 1))
                snap.atoms = np.hstack((snap.atoms, new_column))
        logger.info(f"New column '{colname}' added successfully.")

    def kind(self, listtypes: Optional[Dict[str, List[str]]] = None) -> Optional[str]:
        """
        Guess the kind of dump file based on column names.

        Parameters:
            listtypes (Optional[Dict[str, List[str]]]): A dictionary defining possible types.

        Returns:
            Optional[str]: The kind of dump file if matched, else None.
        """
        if listtypes is None:
            listtypes = {
                'vxyz': ["id", "type", "x", "y", "z", "vx", "vy", "vz"],
                'xyz': ["id", "type", "x", "y", "z"]
            }
            internaltypes = True
        else:
            listtypes = {"user_type": listtypes}
            internaltypes = False

        for kind, columns in listtypes.items():
            if all(col in self.names for col in columns):
                logger.info(f"Dump kind identified as '{kind}'.")
                return kind
        logger.warning("Dump kind could not be identified.")
        return None

    @property
    def type(self) -> int:
        """
        Get the type of dump file defined as a hash of column names.

        Returns:
            int: Hash value representing the dump type.
        """
        type_hash = hash(self.names2str())
        logger.debug(f"Dump type hash: {type_hash}")
        return type_hash

    def names2str(self) -> str:
        """
        Convert column names to a sorted string based on their indices.

        Returns:
            str: A string of column names sorted by their column index.
        """
        sorted_columns = sorted(self.names.items(), key=lambda item: item[1])
        names_str = " ".join([col for col, _ in sorted_columns])
        logger.debug(f"Column names string: {names_str}")
        return names_str

    def __add__(self, other: 'dump') -> 'dump':
        """
        Merge two dump objects of the same type.

        Parameters:
            other (dump): Another dump object to merge with.

        Returns:
            dump: A new dump object containing snapshots from both dumps.

        Raises:
            ValueError: If the dump types do not match or other is not a dump instance.
        """
        return self.__add__(other)

    def iterator(self, flag: int) -> Tuple[int, int, int]:
        """
        Iterator method to loop over selected snapshots.

        Parameters:
            flag (int): 0 for the first call, 1 for subsequent calls.

        Returns:
            Tuple[int, int, int]: (index, time, flag)
        """
        if not hasattr(self, 'iterate'):
            self.iterate = -1
        if flag == 0:
            self.iterate = 0
        else:
            self.iterate += 1
        while self.iterate < self.nsnaps:
            snap = self.snaps[self.iterate]
            if snap.tselect:
                logger.debug(f"Iterator returning snapshot {self.iterate} at time {snap.time}.")
                return self.iterate, snap.time, 1
            self.iterate += 1
        return 0, 0, -1

    def viz(self, index: int, flag: int = 0) -> Tuple[int, List[float], List[List[Union[int, float]]], 
                                                   List[List[Union[int, float]]], List[Any], List[Any]]:
        """
        Return visualization data for a specified snapshot.

        Parameters:
            index (int): Snapshot index or timestep value.
            flag (int): If 1, treat index as timestep value. Default is 0.

        Returns:
            Tuple[int, List[float], List[List[Union[int, float]]], List[List[Union[int, float]]], List[Any], List[Any]]:
                (time, box, atoms, bonds, tris, lines)

        Raises:
            ValueError: If the snapshot index is invalid.
        """
        if flag:
            try:
                isnap = self.findtime(index)
            except ValueError as e:
                logger.error(e)
                raise
        else:
            isnap = index
            if isnap < 0 or isnap >= self.nsnaps:
                raise ValueError("Snapshot index out of range.")

        snap = self.snaps[isnap]
        time = snap.time
        box = [snap.xlo, snap.ylo, snap.zlo, snap.xhi, snap.yhi, snap.zhi]
        id_idx = self.names.get("id")
        type_idx = self.names.get(self.atype)
        x_idx = self.names.get("x")
        y_idx = self.names.get("y")
        z_idx = self.names.get("z")

        if None in [id_idx, type_idx, x_idx, y_idx, z_idx]:
            raise ValueError("One or more required columns (id, type, x, y, z) are not defined.")

        # Create atom list for visualization
        atoms = snap.atoms[snap.aselect][:, [id_idx, type_idx, x_idx, y_idx, z_idx]].astype(object).tolist()

        # Create bonds list if bonds are defined
        bonds = []
        if self.bondflag:
            if self.bondflag == 1:
                bondlist = self.bondlist
            elif self.bondflag == 2 and self.objextra:
                _, _, _, bondlist, _, _ = self.objextra.viz(time, 1)
            else:
                bondlist = []
            if bondlist:
                id_to_atom = {atom[0]: atom for atom in atoms}
                for bond in bondlist:
                    try:
                        atom1 = id_to_atom[bond[2]]
                        atom2 = id_to_atom[bond[3]]
                        bonds.append([
                            bond[0],
                            bond[1],
                            atom1[2], atom1[3], atom1[4],
                            atom2[2], atom2[3], atom2[4],
                            atom1[1], atom2[1]
                        ])
                    except KeyError:
                        logger.warning(f"Bond with atom IDs {bond[2]}, {bond[3]} not found in selected atoms.")
                        continue

        # Create tris list if tris are defined
        tris = []
        if self.triflag:
            if self.triflag == 1:
                tris = self.trilist
            elif self.triflag == 2 and self.objextra:
                _, _, _, _, tris, _ = self.objextra.viz(time, 1)
        # Create lines list if lines are defined
        lines = []
        if self.lineflag:
            if self.lineflag == 1:
                lines = self.linelist
            elif self.lineflag == 2 and self.objextra:
                _, _, _, _, _, lines = self.objextra.viz(time, 1)

        logger.debug(f"Visualization data prepared for snapshot {isnap} at time {time}.")
        return time, box, atoms, bonds, tris, lines

    def findtime(self, n: int) -> int:
        """
        Find the index of a given timestep.

        Parameters:
            n (int): The timestep to find.

        Returns:
            int: The index of the timestep.

        Raises:
            ValueError: If the timestep does not exist.
        """
        for i, snap in enumerate(self.snaps):
            if snap.time == n:
                return i
        raise ValueError(f"No step {n} exists.")

    def maxbox(self) -> List[float]:
        """
        Return the maximum box dimensions across all selected snapshots.

        Returns:
            List[float]: [xlo, ylo, zlo, xhi, yhi, zhi]
        """
        xlo = ylo = zlo = np.inf
        xhi = yhi = zhi = -np.inf
        for snap in self.snaps:
            if not snap.tselect:
                continue
            xlo = min(xlo, snap.xlo)
            ylo = min(ylo, snap.ylo)
            zlo = min(zlo, snap.zlo)
            xhi = max(xhi, snap.xhi)
            yhi = max(yhi, snap.yhi)
            zhi = max(zhi, snap.zhi)
        box = [xlo, ylo, zlo, xhi, yhi, zhi]
        logger.debug(f"Maximum box dimensions: {box}")
        return box

    def maxtype(self) -> int:
        """
        Return the maximum atom type across all selected snapshots and atoms.

        Returns:
            int: Maximum atom type.
        """
        if "type" not in self.names:
            logger.warning("Column 'type' not found.")
            return 0
        icol = self.names["type"]
        max_type = 0
        for snap in self.snaps:
            if not snap.tselect:
                continue
            selected_atoms = snap.atoms[snap.aselect]
            if selected_atoms.size == 0:
                continue
            current_max = int(selected_atoms[:, icol].max())
            if current_max > max_type:
                max_type = current_max
        logger.info(f"Maximum atom type: {max_type}")
        return max_type

    def extra(self, obj: Any) -> None:
        """
        Extract bonds, tris, or lines from another object.

        Parameters:
            obj (Any): The object to extract from. Can be a data object, cdata, bdump, etc.

        Raises:
            ValueError: If the argument type is unrecognized.
        """
        from pizza.data3 import data
        from pizza.converted.cdata3 import cdata
        from pizza.converted.bdump3 import bdump
        from pizza.converted.ldump3 import ldump
        from pizza.converted.tdump3 import tdump

        logger.info(f"Extracting extra information from object of type '{type(obj)}'.")
        if isinstance(obj, data) and "Bonds" in obj.sections:
            self.bondflag = 1
            self.bondlist = [
                [int(line.split()[0]), int(line.split()[1]), int(line.split()[2]), int(line.split()[3])]
                for line in obj.sections["Bonds"]
            ]
            logger.debug(f"Extracted {len(self.bondlist)} bonds from data object.")
        elif hasattr(obj, 'viz'):
            if isinstance(obj, cdata):
                tris, lines = obj.viz()
                if tris:
                    self.triflag = 1
                    self.trilist = tris
                if lines:
                    self.lineflag = 1
                    self.linelist = lines
                logger.debug(f"Extracted tris and lines from cdata object.")
            elif isinstance(obj, bdump):
                self.bondflag = 2
                self.objextra = obj
                logger.debug(f"Configured dynamic bond extraction from bdump object.")
            elif isinstance(obj, tdump):
                self.triflag = 2
                self.objextra = obj
                logger.debug(f"Configured dynamic tri extraction from tdump object.")
            elif isinstance(obj, ldump):
                self.lineflag = 2
                self.objextra = obj
                logger.debug(f"Configured dynamic line extraction from ldump object.")
            else:
                logger.error("Unrecognized object type for extra extraction.")
                raise ValueError("Unrecognized argument to dump.extra().")
        else:
            logger.error("Unrecognized argument type for extra extraction.")
            raise ValueError("Unrecognized argument to dump.extra().")

Instance variables

var type : int

Get the type of dump file defined as a hash of column names.

Returns

int
Hash value representing the dump type.
Expand source code
@property
def type(self) -> int:
    """
    Get the type of dump file defined as a hash of column names.

    Returns:
        int: Hash value representing the dump type.
    """
    type_hash = hash(self.names2str())
    logger.debug(f"Dump type hash: {type_hash}")
    return type_hash

Methods

def assign_column_names(self, line: str) ‑> NoneType

Assign column names based on the ATOMS section header.

Parameters

line (str): The header line containing column names.

Expand source code
def assign_column_names(self, line: str) -> None:
    """
    Assign column names based on the ATOMS section header.

    Parameters:
        line (str): The header line containing column names.
    """
    try:
        columns = line.strip().split()[1:]  # Skip the first word (e.g., "id")
        for idx, col in enumerate(columns):
            self.names[col] = idx
        logger.debug(f"Assigned column names: {self.names}")
        # Determine scaling status based on column names
        x_scaled = "xs" in self.names
        y_scaled = "ys" in self.names
        z_scaled = "zs" in self.names
        self.scale_original = 1 if x_scaled and y_scaled and z_scaled else 0
        logger.info(f"Coordinate scaling status: {'scaled' if self.scale_original else 'unscaled'}")
    except Exception as e:
        logger.error(f"Error assigning column names: {e}")
        raise
def atom(self, n: int, *columns: str) ‑> Union[List[float], List[List[float]]]

Extract values for a specific atom ID across all selected snapshots.

Parameters

n (int): The atom ID to extract. *columns (str): The column names to extract.

Returns

Union[List[float], List[List[float]]]
The extracted values.

Raises

KeyError
If any specified column does not exist.
ValueError
If the atom ID is not found in any snapshot.
Expand source code
def atom(self, n: int, *columns: str) -> Union[List[float], List[List[float]]]:
    """
    Extract values for a specific atom ID across all selected snapshots.

    Parameters:
        n (int): The atom ID to extract.
        *columns (str): The column names to extract.

    Returns:
        Union[List[float], List[List[float]]]: The extracted values.

    Raises:
        KeyError: If any specified column does not exist.
        ValueError: If the atom ID is not found in any snapshot.
    """
    logger.info(f"Extracting atom ID {n} values for columns {columns}.")
    if not columns:
        raise ValueError("No columns specified for extraction.")
    column_indices = []
    for col in columns:
        if col not in self.names:
            raise KeyError(f"Column '{col}' not found.")
        column_indices.append(self.names[col])

    extracted = [[] for _ in columns]
    for snap in self.snaps:
        if not snap.tselect:
            continue
        atom_rows = snap.atoms[snap.aselect]
        id_column = self.names["id"]
        matching_atoms = atom_rows[atom_rows[:, id_column] == n]
        if matching_atoms.size == 0:
            raise ValueError(f"Atom ID {n} not found in snapshot at timestep {snap.time}.")
        atom = matching_atoms[0]
        for idx, col_idx in enumerate(column_indices):
            extracted[idx].append(atom[col_idx])
    if len(columns) == 1:
        return extracted[0]
    return extracted
def clone(self, nstep: int, col: str) ‑> NoneType

Clone the value from a specific timestep's column to all selected snapshots for atoms with the same ID.

Parameters

nstep (int): The timestep to clone from. col (str): The column name to clone.

Raises

KeyError
If the column or ID column does not exist.
ValueError
If the specified timestep does not exist.
Expand source code
def clone(self, nstep: int, col: str) -> None:
    """
    Clone the value from a specific timestep's column to all selected snapshots for atoms with the same ID.

    Parameters:
        nstep (int): The timestep to clone from.
        col (str): The column name to clone.

    Raises:
        KeyError: If the column or ID column does not exist.
        ValueError: If the specified timestep does not exist.
    """
    logger.info(f"Cloning column '{col}' from timestep {nstep} to all selected snapshots.")
    if "id" not in self.names:
        raise KeyError("Column 'id' not found.")
    if col not in self.names:
        raise KeyError(f"Column '{col}' not found.")
    istep = self.findtime(nstep)
    icol = self.names[col]
    id_col = self.names["id"]
    id_to_index = {atom[id_col]: idx for idx, atom in enumerate(self.snaps[istep].atoms)}
    for snap in self.snaps:
        if not snap.tselect:
            continue
        for i, atom in enumerate(snap.atoms):
            if not snap.aselect[i]:
                continue
            atom_id = atom[id_col]
            if atom_id in id_to_index:
                snap.atoms[i, icol] = self.snaps[istep].atoms[id_to_index[atom_id], icol]
    logger.info("Cloning completed successfully.")
def cull(self) ‑> NoneType

Remove duplicate snapshots based on timestep.

Expand source code
def cull(self) -> None:
    """
    Remove duplicate snapshots based on timestep.
    """
    unique_snaps = {}
    culled_snaps = []
    for snap in self.snaps:
        if snap.time not in unique_snaps:
            unique_snaps[snap.time] = snap
            culled_snaps.append(snap)
        else:
            logger.warning(f"Duplicate timestep {snap.time} found. Culling duplicate.")
    self.snaps = culled_snaps
    logger.info(f"Culled duplicates. Total snapshots: {len(self.snaps)}")
def extra(self, obj: Any) ‑> NoneType

Extract bonds, tris, or lines from another object.

Parameters

obj (Any): The object to extract from. Can be a data object, cdata, bdump, etc.

Raises

ValueError
If the argument type is unrecognized.
Expand source code
def extra(self, obj: Any) -> None:
    """
    Extract bonds, tris, or lines from another object.

    Parameters:
        obj (Any): The object to extract from. Can be a data object, cdata, bdump, etc.

    Raises:
        ValueError: If the argument type is unrecognized.
    """
    from pizza.data3 import data
    from pizza.converted.cdata3 import cdata
    from pizza.converted.bdump3 import bdump
    from pizza.converted.ldump3 import ldump
    from pizza.converted.tdump3 import tdump

    logger.info(f"Extracting extra information from object of type '{type(obj)}'.")
    if isinstance(obj, data) and "Bonds" in obj.sections:
        self.bondflag = 1
        self.bondlist = [
            [int(line.split()[0]), int(line.split()[1]), int(line.split()[2]), int(line.split()[3])]
            for line in obj.sections["Bonds"]
        ]
        logger.debug(f"Extracted {len(self.bondlist)} bonds from data object.")
    elif hasattr(obj, 'viz'):
        if isinstance(obj, cdata):
            tris, lines = obj.viz()
            if tris:
                self.triflag = 1
                self.trilist = tris
            if lines:
                self.lineflag = 1
                self.linelist = lines
            logger.debug(f"Extracted tris and lines from cdata object.")
        elif isinstance(obj, bdump):
            self.bondflag = 2
            self.objextra = obj
            logger.debug(f"Configured dynamic bond extraction from bdump object.")
        elif isinstance(obj, tdump):
            self.triflag = 2
            self.objextra = obj
            logger.debug(f"Configured dynamic tri extraction from tdump object.")
        elif isinstance(obj, ldump):
            self.lineflag = 2
            self.objextra = obj
            logger.debug(f"Configured dynamic line extraction from ldump object.")
        else:
            logger.error("Unrecognized object type for extra extraction.")
            raise ValueError("Unrecognized argument to dump.extra().")
    else:
        logger.error("Unrecognized argument type for extra extraction.")
        raise ValueError("Unrecognized argument to dump.extra().")
def findtime(self, n: int) ‑> int

Find the index of a given timestep.

Parameters

n (int): The timestep to find.

Returns

int
The index of the timestep.

Raises

ValueError
If the timestep does not exist.
Expand source code
def findtime(self, n: int) -> int:
    """
    Find the index of a given timestep.

    Parameters:
        n (int): The timestep to find.

    Returns:
        int: The index of the timestep.

    Raises:
        ValueError: If the timestep does not exist.
    """
    for i, snap in enumerate(self.snaps):
        if snap.time == n:
            return i
    raise ValueError(f"No step {n} exists.")
def iterator(self, flag: int) ‑> Tuple[int, int, int]

Iterator method to loop over selected snapshots.

Parameters

flag (int): 0 for the first call, 1 for subsequent calls.

Returns

Tuple[int, int, int]
(index, time, flag)
Expand source code
def iterator(self, flag: int) -> Tuple[int, int, int]:
    """
    Iterator method to loop over selected snapshots.

    Parameters:
        flag (int): 0 for the first call, 1 for subsequent calls.

    Returns:
        Tuple[int, int, int]: (index, time, flag)
    """
    if not hasattr(self, 'iterate'):
        self.iterate = -1
    if flag == 0:
        self.iterate = 0
    else:
        self.iterate += 1
    while self.iterate < self.nsnaps:
        snap = self.snaps[self.iterate]
        if snap.tselect:
            logger.debug(f"Iterator returning snapshot {self.iterate} at time {snap.time}.")
            return self.iterate, snap.time, 1
        self.iterate += 1
    return 0, 0, -1
def kind(self, listtypes: Optional[Dict[str, List[str]]] = None) ‑> Optional[str]

Guess the kind of dump file based on column names.

Parameters

listtypes (Optional[Dict[str, List[str]]]): A dictionary defining possible types.

Returns

Optional[str]
The kind of dump file if matched, else None.
Expand source code
def kind(self, listtypes: Optional[Dict[str, List[str]]] = None) -> Optional[str]:
    """
    Guess the kind of dump file based on column names.

    Parameters:
        listtypes (Optional[Dict[str, List[str]]]): A dictionary defining possible types.

    Returns:
        Optional[str]: The kind of dump file if matched, else None.
    """
    if listtypes is None:
        listtypes = {
            'vxyz': ["id", "type", "x", "y", "z", "vx", "vy", "vz"],
            'xyz': ["id", "type", "x", "y", "z"]
        }
        internaltypes = True
    else:
        listtypes = {"user_type": listtypes}
        internaltypes = False

    for kind, columns in listtypes.items():
        if all(col in self.names for col in columns):
            logger.info(f"Dump kind identified as '{kind}'.")
            return kind
    logger.warning("Dump kind could not be identified.")
    return None
def maxbox(self) ‑> List[float]

Return the maximum box dimensions across all selected snapshots.

Returns

List[float]
[xlo, ylo, zlo, xhi, yhi, zhi]
Expand source code
def maxbox(self) -> List[float]:
    """
    Return the maximum box dimensions across all selected snapshots.

    Returns:
        List[float]: [xlo, ylo, zlo, xhi, yhi, zhi]
    """
    xlo = ylo = zlo = np.inf
    xhi = yhi = zhi = -np.inf
    for snap in self.snaps:
        if not snap.tselect:
            continue
        xlo = min(xlo, snap.xlo)
        ylo = min(ylo, snap.ylo)
        zlo = min(zlo, snap.zlo)
        xhi = max(xhi, snap.xhi)
        yhi = max(yhi, snap.yhi)
        zhi = max(zhi, snap.zhi)
    box = [xlo, ylo, zlo, xhi, yhi, zhi]
    logger.debug(f"Maximum box dimensions: {box}")
    return box
def maxtype(self) ‑> int

Return the maximum atom type across all selected snapshots and atoms.

Returns

int
Maximum atom type.
Expand source code
def maxtype(self) -> int:
    """
    Return the maximum atom type across all selected snapshots and atoms.

    Returns:
        int: Maximum atom type.
    """
    if "type" not in self.names:
        logger.warning("Column 'type' not found.")
        return 0
    icol = self.names["type"]
    max_type = 0
    for snap in self.snaps:
        if not snap.tselect:
            continue
        selected_atoms = snap.atoms[snap.aselect]
        if selected_atoms.size == 0:
            continue
        current_max = int(selected_atoms[:, icol].max())
        if current_max > max_type:
            max_type = current_max
    logger.info(f"Maximum atom type: {max_type}")
    return max_type
def minmax(self, colname: str) ‑> Tuple[float, float]

Find the minimum and maximum values for a specified column across all selected snapshots and atoms.

Parameters

colname (str): The column name to find min and max for.

Returns

Tuple[float, float]
The minimum and maximum values.

Raises

KeyError
If the column name does not exist.
Expand source code
def minmax(self, colname: str) -> Tuple[float, float]:
    """
    Find the minimum and maximum values for a specified column across all selected snapshots and atoms.

    Parameters:
        colname (str): The column name to find min and max for.

    Returns:
        Tuple[float, float]: The minimum and maximum values.

    Raises:
        KeyError: If the column name does not exist.
    """
    if colname not in self.names:
        raise KeyError(f"Column '{colname}' not found.")
    icol = self.names[colname]
    min_val = np.inf
    max_val = -np.inf
    for snap in self.snaps:
        if not snap.tselect:
            continue
        selected_atoms = snap.atoms[snap.aselect]
        if selected_atoms.size == 0:
            continue
        current_min = selected_atoms[:, icol].min()
        current_max = selected_atoms[:, icol].max()
        if current_min < min_val:
            min_val = current_min
        if current_max > max_val:
            max_val = current_max
    logger.info(f"minmax for column '{colname}': min={min_val}, max={max_val}")
    return min_val, max_val
def names2str(self) ‑> str

Convert column names to a sorted string based on their indices.

Returns

str
A string of column names sorted by their column index.
Expand source code
def names2str(self) -> str:
    """
    Convert column names to a sorted string based on their indices.

    Returns:
        str: A string of column names sorted by their column index.
    """
    sorted_columns = sorted(self.names.items(), key=lambda item: item[1])
    names_str = " ".join([col for col, _ in sorted_columns])
    logger.debug(f"Column names string: {names_str}")
    return names_str
def newcolumn(self, colname: str) ‑> NoneType

Add a new column to every snapshot and initialize it to zero.

Parameters

colname (str): The name of the new column.

Expand source code
def newcolumn(self, colname: str) -> None:
    """
    Add a new column to every snapshot and initialize it to zero.

    Parameters:
        colname (str): The name of the new column.
    """
    logger.info(f"Adding new column '{colname}' with default value 0.")
    if colname in self.names:
        logger.warning(f"Column '{colname}' already exists.")
        return
    new_col_index = len(self.names)
    self.names[colname] = new_col_index
    for snap in self.snaps:
        if snap.atoms is not None:
            new_column = np.zeros((snap.atoms.shape[0], 1))
            snap.atoms = np.hstack((snap.atoms, new_column))
    logger.info(f"New column '{colname}' added successfully.")
def read_all(self) ‑> NoneType

Read all snapshots from each file in the file list.

Expand source code
def read_all(self) -> None:
    """
    Read all snapshots from each file in the file list.
    """
    for file in self.flist:
        is_gzipped = file.endswith(".gz")
        try:
            if is_gzipped:
                with subprocess.Popen([PIZZA_GUNZIP, "-c", file],
                                      stdout=subprocess.PIPE,
                                      text=True) as proc:
                    file_handle = proc.stdout
                    logger.debug(f"Opened gzipped file: {file}")
            else:
                file_handle = open(file, 'r')
                logger.debug(f"Opened file: {file}")

            with file_handle:
                snap = self.read_snapshot(file_handle)
                while snap:
                    self.snaps.append(snap)
                    logger.info(f"Read snapshot at time {snap.time}")
                    snap = self.read_snapshot(file_handle)
        except subprocess.CalledProcessError as e:
            logger.error(f"Error decompressing file '{file}': {e}")
            raise
        except FileNotFoundError:
            logger.error(f"File '{file}' not found.")
            raise
        except Exception as e:
            logger.error(f"Error reading file '{file}': {e}")
            raise

    self.snaps.sort()
    self.cull()
    self.nsnaps = len(self.snaps)
    logger.info(f"Read {self.nsnaps} snapshots.")

    # Select all timesteps and atoms by default
    self.tselect.all()

    # Log column assignments
    if self.names:
        logger.info(f"Assigned columns: {', '.join(sorted(self.names.keys(), key=lambda k: self.names[k]))}")
    else:
        logger.warning("No column assignments made.")

    # Unscale if necessary
    if self.nsnaps > 0:
        if getattr(self, 'scale_original', -1) == 1:
            self.unscale()
        elif getattr(self, 'scale_original', -1) == 0:
            logger.info("Dump is already unscaled.")
        else:
            logger.warning("Dump scaling status is unknown.")
def read_snapshot(self, f) ‑> Optional[pizza.dump3.Snap]

Read a single snapshot from a file.

Parameters

f (file object): File handle to read from.

Returns

Optional[Snap]
Snapshot object or None if failed.
Expand source code
def read_snapshot(self, f) -> Optional['Snap']:
    """
    Read a single snapshot from a file.

    Parameters:
        f (file object): File handle to read from.

    Returns:
        Optional[Snap]: Snapshot object or None if failed.
    """
    try:
        snap = Snap()

        # Read and assign ITEMS
        while True:
            item = f.readline()
            if not item:
                break
            if not item.startswith("ITEM:"):
                continue
            item_type = item.split("ITEM:")[1].strip()
            if item_type == "TIME":
                snap.realtime = float(f.readline().strip())
            elif item_type == "TIMESTEP":
                snap.time = int(f.readline().strip())
            elif item_type == "NUMBER OF ATOMS":
                snap.natoms = int(f.readline().strip())
            elif item_type.startswith("BOX BOUNDS"):
                snap.boxstr = item_type.split("BOX BOUNDS")[1].strip()
                box_bounds = []
                for _ in range(3):
                    bounds = f.readline().strip().split()
                    box_bounds.append(tuple(map(float, bounds[:2])))
                    if len(bounds) > 2:
                        setattr(snap, bounds[2], float(bounds[2]))
                    else:
                        setattr(snap, bounds[2] if len(bounds) > 2 else 'xy', 0.0)
                snap.xlo, snap.xhi = box_bounds[0]
                snap.ylo, snap.yhi = box_bounds[1]
                snap.zlo, snap.zhi = box_bounds[2]
                snap.triclinic = 1 if len(box_bounds[0]) > 2 else 0
            elif item_type == "ATOMS":
                if not self.names:
                    self.assign_column_names(f.readline())
                snap.aselect = np.ones(snap.natoms, dtype=bool)
                atoms = []
                for _ in range(snap.natoms):
                    line = f.readline()
                    if not line:
                        break
                    atoms.append(list(map(float, line.strip().split())))
                snap.atoms = np.array(atoms)
                break

        if not hasattr(snap, 'time'):
            return None

        return snap
    except Exception as e:
        logger.error(f"Error reading snapshot: {e}")
        return None
def realtime(self) ‑> List[float]

Return a list of selected snapshot real-time values.

Returns

List[float]
List of real-time values.
Expand source code
def realtime(self) -> List[float]:
    """
    Return a list of selected snapshot real-time values.

    Returns:
        List[float]: List of real-time values.
    """
    times = [snap.realtime for snap in self.snaps if snap.tselect and hasattr(snap, 'realtime')]
    logger.debug(f"Selected real-time values: {times}")
    return times
def scatter(self, root: str) ‑> NoneType

Write each selected snapshot to a separate dump file with timestep suffix.

Parameters

root (str): The root name for output files. Suffix will be added based on timestep.

Expand source code
def scatter(self, root: str) -> None:
    """
    Write each selected snapshot to a separate dump file with timestep suffix.

    Parameters:
        root (str): The root name for output files. Suffix will be added based on timestep.
    """
    try:
        for snap in self.snaps:
            if not snap.tselect:
                continue
            filename = f"{root}.{snap.time}"
            with open(filename, "w") as f:
                f.write("ITEM: TIMESTEP\n")
                f.write(f"{snap.time}\n")
                f.write("ITEM: NUMBER OF ATOMS\n")
                f.write(f"{snap.nselect}\n")
                f.write("ITEM: BOX BOUNDS xy xz yz\n" if snap.triclinic else "ITEM: BOX BOUNDS pp pp pp\n")
                f.write(f"{snap.xlo} {snap.xhi} {getattr(snap, 'xy', 0.0)}\n")
                f.write(f"{snap.ylo} {snap.yhi} {getattr(snap, 'xz', 0.0)}\n")
                f.write(f"{snap.zlo} {snap.zhi} {getattr(snap, 'yz', 0.0)}\n")
                f.write(f"ITEM: ATOMS {' '.join(sorted(self.names.keys(), key=lambda k: self.names[k]))}\n")
                for atom in snap.atoms[snap.aselect]:
                    atom_str = " ".join([f"{int(atom[self.names['id']])}" if key in ["id", "type"] else f"{atom[self.names[key]]}" 
                                         for key in sorted(self.names.keys(), key=lambda k: self.names[k])])
                    f.write(f"{atom_str}\n")
        logger.info(f"Scatter write completed with root '{root}'.")
    except IOError as e:
        logger.error(f"Error writing scatter files: {e}")
        raise
def set(self, eq: str) ‑> NoneType

Set a column value using an equation for all selected snapshots and atoms.

Parameters

eq (str): The equation to compute the new column values. Use $ for variables.

Example

d.set("$ke = $vx * $vx + $vy * $vy")

Expand source code
def set(self, eq: str) -> None:
    """
    Set a column value using an equation for all selected snapshots and atoms.

    Parameters:
        eq (str): The equation to compute the new column values. Use $<column_name> for variables.

    Example:
        d.set("$ke = $vx * $vx + $vy * $vy")
    """
    logger.info(f"Setting column using equation: {eq}")
    pattern = r"\$\w+"
    variables = re.findall(pattern, eq)
    if not variables:
        logger.warning("No variables found in equation.")
        return
    lhs = variables[0][1:]
    if lhs not in self.names:
        self.newcolumn(lhs)
    try:
        # Replace $var with appropriate array accesses
        for var in variables:
            var_name = var[1:]
            if var_name not in self.names:
                raise KeyError(f"Variable '{var_name}' not found in columns.")
            col_index = self.names[var_name]
            eq = eq.replace(var, f"snap.atoms[i][{col_index}]")
        compiled_eq = compile(eq, "<string>", "exec")
        for snap in self.snaps:
            if not snap.tselect:
                continue
            for i in range(snap.natoms):
                if not snap.aselect[i]:
                    continue
                exec(compiled_eq)
        logger.info("Column values set successfully.")
    except Exception as e:
        logger.error(f"Error setting column values: {e}")
        raise
def setv(self, colname: str, vector: List[float]) ‑> NoneType

Set a column value using a vector of values for all selected snapshots and atoms.

Parameters

colname (str): The column name to set. vector (List[float]): The values to assign to the column.

Raises

KeyError
If the column name does not exist.
ValueError
If the length of the vector does not match the number of selected atoms.
Expand source code
def setv(self, colname: str, vector: List[float]) -> None:
    """
    Set a column value using a vector of values for all selected snapshots and atoms.

    Parameters:
        colname (str): The column name to set.
        vector (List[float]): The values to assign to the column.

    Raises:
        KeyError: If the column name does not exist.
        ValueError: If the length of the vector does not match the number of selected atoms.
    """
    logger.info(f"Setting column '{colname}' using a vector of values.")
    if colname not in self.names:
        self.newcolumn(colname)
    icol = self.names[colname]
    for snap in self.snaps:
        if not snap.tselect:
            continue
        if len(vector) != snap.nselect:
            raise ValueError("Vector length does not match the number of selected atoms.")
        selected_indices = np.where(snap.aselect)[0]
        snap.atoms[selected_indices, icol] = vector
    logger.info(f"Column '{colname}' set successfully.")
def sort(self, key: Union[str, int] = 'id') ‑> NoneType

Sort atoms or snapshots.

Parameters

key (Union[str, int]): The key to sort by. If str, sorts snapshots by that column. If int, sorts atoms in a specific timestep.

Expand source code
def sort(self, key: Union[str, int] = "id") -> None:
    """
    Sort atoms or snapshots.

    Parameters:
        key (Union[str, int]): The key to sort by. If str, sorts snapshots by that column. If int, sorts atoms in a specific timestep.
    """
    if isinstance(key, str):
        if key not in self.names:
            raise ValueError(f"Column '{key}' not found for sorting.")
        logger.info(f"Sorting snapshots by column '{key}'.")
        icol = self.names[key]
        for snap in self.snaps:
            if not snap.tselect:
                continue
            snap.atoms = snap.atoms[snap.atoms[:, icol].argsort()]
    elif isinstance(key, int):
        try:
            snap = self.snaps[self.findtime(key)]
            logger.info(f"Sorting atoms in snapshot at timestep {key}.")
            if "id" in self.names:
                id_col = self.names["id"]
                snap.atoms = snap.atoms[snap.atoms[:, id_col].argsort()]
            else:
                logger.warning("No 'id' column found for sorting atoms.")
        except ValueError as e:
            logger.error(e)
            raise
    else:
        logger.error("Invalid key type for sort().")
        raise TypeError("Key must be a string or integer.")
def spread(self, old: str, n: int, new: str) ‑> NoneType

Spread values from an old column into a new column as integers from 1 to n based on their relative positions.

Parameters

old (str): The column name to spread. n (int): The number of spread values. new (str): The new column name to create.

Raises

KeyError
If the old column does not exist.
Expand source code
def spread(self, old: str, n: int, new: str) -> None:
    """
    Spread values from an old column into a new column as integers from 1 to n based on their relative positions.

    Parameters:
        old (str): The column name to spread.
        n (int): The number of spread values.
        new (str): The new column name to create.

    Raises:
        KeyError: If the old column does not exist.
    """
    logger.info(f"Spreading column '{old}' into new column '{new}' with {n} spread values.")
    if old not in self.names:
        raise KeyError(f"Column '{old}' not found.")
    if new not in self.names:
        self.newcolumn(new)
    iold = self.names[old]
    inew = self.names[new]
    min_val, max_val = self.minmax(old)
    gap = max_val - min_val
    if gap == 0:
        gap = 1.0  # Prevent division by zero
    invdelta = n / gap
    for snap in self.snaps:
        if not snap.tselect:
            continue
        selected_atoms = snap.atoms[snap.aselect]
        snap.atoms[snap.aselect, inew] = np.clip(((selected_atoms[:, iold] - min_val) * invdelta).astype(int) + 1, 1, n)
    logger.info(f"Column '{new}' spread successfully.")
def time(self) ‑> List[int]

Return a list of selected snapshot timesteps.

Returns

List[int]
List of timestep values.
Expand source code
def time(self) -> List[int]:
    """
    Return a list of selected snapshot timesteps.

    Returns:
        List[int]: List of timestep values.
    """
    times = [snap.time for snap in self.snaps if snap.tselect]
    logger.debug(f"Selected timesteps: {times}")
    return times
def vecs(self, n: int, *columns: str) ‑> Union[List[float], List[List[float]]]

Extract values for selected atoms at a specific timestep.

Parameters

n (int): The timestep to extract from. *columns (str): The column names to extract.

Returns

Union[List[float], List[List[float]]]
The extracted values.

Raises

KeyError
If any specified column does not exist.
ValueError
If the specified timestep does not exist.
Expand source code
def vecs(self, n: int, *columns: str) -> Union[List[float], List[List[float]]]:
    """
    Extract values for selected atoms at a specific timestep.

    Parameters:
        n (int): The timestep to extract from.
        *columns (str): The column names to extract.

    Returns:
        Union[List[float], List[List[float]]]: The extracted values.

    Raises:
        KeyError: If any specified column does not exist.
        ValueError: If the specified timestep does not exist.
    """
    logger.info(f"Extracting columns {columns} for timestep {n}.")
    if not columns:
        raise ValueError("No columns specified for extraction.")
    try:
        snap = self.snaps[self.findtime(n)]
    except ValueError as e:
        logger.error(e)
        raise
    column_indices = []
    for col in columns:
        if col not in self.names:
            raise KeyError(f"Column '{col}' not found.")
        column_indices.append(self.names[col])
    extracted = [[] for _ in columns]
    selected_atoms = snap.atoms[snap.aselect]
    for atom in selected_atoms:
        for idx, col_idx in enumerate(column_indices):
            extracted[idx].append(atom[col_idx])
    if len(columns) == 1:
        return extracted[0]
    return extracted
def viz(self, index: int, flag: int = 0) ‑> Tuple[int, List[float], List[List[Union[int, float]]], List[List[Union[int, float]]], List[Any], List[Any]]

Return visualization data for a specified snapshot.

Parameters

index (int): Snapshot index or timestep value. flag (int): If 1, treat index as timestep value. Default is 0.

Returns

Tuple[int, List[float], List[List[Union[int, float]]], List[List[Union[int, float]]], List[Any], List[Any]]: (time, box, atoms, bonds, tris, lines)

Raises

ValueError
If the snapshot index is invalid.
Expand source code
def viz(self, index: int, flag: int = 0) -> Tuple[int, List[float], List[List[Union[int, float]]], 
                                               List[List[Union[int, float]]], List[Any], List[Any]]:
    """
    Return visualization data for a specified snapshot.

    Parameters:
        index (int): Snapshot index or timestep value.
        flag (int): If 1, treat index as timestep value. Default is 0.

    Returns:
        Tuple[int, List[float], List[List[Union[int, float]]], List[List[Union[int, float]]], List[Any], List[Any]]:
            (time, box, atoms, bonds, tris, lines)

    Raises:
        ValueError: If the snapshot index is invalid.
    """
    if flag:
        try:
            isnap = self.findtime(index)
        except ValueError as e:
            logger.error(e)
            raise
    else:
        isnap = index
        if isnap < 0 or isnap >= self.nsnaps:
            raise ValueError("Snapshot index out of range.")

    snap = self.snaps[isnap]
    time = snap.time
    box = [snap.xlo, snap.ylo, snap.zlo, snap.xhi, snap.yhi, snap.zhi]
    id_idx = self.names.get("id")
    type_idx = self.names.get(self.atype)
    x_idx = self.names.get("x")
    y_idx = self.names.get("y")
    z_idx = self.names.get("z")

    if None in [id_idx, type_idx, x_idx, y_idx, z_idx]:
        raise ValueError("One or more required columns (id, type, x, y, z) are not defined.")

    # Create atom list for visualization
    atoms = snap.atoms[snap.aselect][:, [id_idx, type_idx, x_idx, y_idx, z_idx]].astype(object).tolist()

    # Create bonds list if bonds are defined
    bonds = []
    if self.bondflag:
        if self.bondflag == 1:
            bondlist = self.bondlist
        elif self.bondflag == 2 and self.objextra:
            _, _, _, bondlist, _, _ = self.objextra.viz(time, 1)
        else:
            bondlist = []
        if bondlist:
            id_to_atom = {atom[0]: atom for atom in atoms}
            for bond in bondlist:
                try:
                    atom1 = id_to_atom[bond[2]]
                    atom2 = id_to_atom[bond[3]]
                    bonds.append([
                        bond[0],
                        bond[1],
                        atom1[2], atom1[3], atom1[4],
                        atom2[2], atom2[3], atom2[4],
                        atom1[1], atom2[1]
                    ])
                except KeyError:
                    logger.warning(f"Bond with atom IDs {bond[2]}, {bond[3]} not found in selected atoms.")
                    continue

    # Create tris list if tris are defined
    tris = []
    if self.triflag:
        if self.triflag == 1:
            tris = self.trilist
        elif self.triflag == 2 and self.objextra:
            _, _, _, _, tris, _ = self.objextra.viz(time, 1)
    # Create lines list if lines are defined
    lines = []
    if self.lineflag:
        if self.lineflag == 1:
            lines = self.linelist
        elif self.lineflag == 2 and self.objextra:
            _, _, _, _, _, lines = self.objextra.viz(time, 1)

    logger.debug(f"Visualization data prepared for snapshot {isnap} at time {time}.")
    return time, box, atoms, bonds, tris, lines
def write(self, filename: str, head: int = 1, app: int = 0) ‑> NoneType

Write the dump object to a LAMMPS dump file.

Parameters

filename (str): The output file path. head (int): Whether to include the snapshot header (1 for yes, 0 for no). app (int): Whether to append to the file (1 for yes, 0 for no).

Expand source code
def write(self, filename: str, head: int = 1, app: int = 0) -> None:
    """
    Write the dump object to a LAMMPS dump file.

    Parameters:
        filename (str): The output file path.
        head (int): Whether to include the snapshot header (1 for yes, 0 for no).
        app (int): Whether to append to the file (1 for yes, 0 for no).
    """
    try:
        mode = "a" if app else "w"
        with open(filename, mode) as f:
            for snap in self.snaps:
                if not snap.tselect:
                    continue
                if head:
                    f.write("ITEM: TIMESTEP\n")
                    f.write(f"{snap.time}\n")
                    f.write("ITEM: NUMBER OF ATOMS\n")
                    f.write(f"{snap.nselect}\n")
                    f.write("ITEM: BOX BOUNDS xy xz yz\n" if snap.triclinic else "ITEM: BOX BOUNDS pp pp pp\n")
                    f.write(f"{snap.xlo} {snap.xhi} {getattr(snap, 'xy', 0.0)}\n")
                    f.write(f"{snap.ylo} {snap.yhi} {getattr(snap, 'xz', 0.0)}\n")
                    f.write(f"{snap.zlo} {snap.zhi} {getattr(snap, 'yz', 0.0)}\n")
                    f.write(f"ITEM: ATOMS {' '.join(sorted(self.names.keys(), key=lambda k: self.names[k]))}\n")
                for atom in snap.atoms[snap.aselect]:
                    atom_str = " ".join([f"{int(atom[self.names['id']])}" if key in ["id", "type"] else f"{atom[self.names[key]]}" 
                                         for key in sorted(self.names.keys(), key=lambda k: self.names[k])])
                    f.write(f"{atom_str}\n")
        logger.info(f"Dump object written to '{filename}'.")
    except IOError as e:
        logger.error(f"Error writing to file '{filename}': {e}")
        raise