Module data3
data
Class
The data
class provides tools to read, write, and manipulate LAMMPS data files, enabling seamless integration with the dump
class for restart generation and simulation data management.
Use the module pizza3.data3_legacy instead of pizza3.data3 if you experience errors.
Features
- Input Handling:
- Supports single or multiple data files, including gzipped files.
-
Create empty data objects or initialize from an existing
dump
object. -
Headers and Sections:
- Access and modify headers, including atom counts and box dimensions.
-
Define, reorder, append, and replace columns in data file sections.
-
Integration with
dump
: - Generate restart files from
dump
snapshots. -
Replace atomic positions and velocities in
Atoms
andVelocities
sections. -
Visualization:
- Extract atoms and bonds for visualization tools.
- Iterate over single data file snapshots (compatible with
dump
).
Usage
Initialization
-
From a File:
python d = data("data.poly") # Read a LAMMPS data file
-
Create an Empty Object:
python d = data() # Create an empty data object
-
From a
dump
Object:python d = data(dump_obj, timestep) # Generate data object from dump snapshot
Accessing Data
-
Headers:
python d.headers["atoms"] = 1500 # Set atom count in header
-
Sections:
python d.sections["Atoms"] = lines # Define the <code>Atoms</code> section
Manipulation
-
Column Mapping:
python d.map(1, "id", 3, "x") # Assign names to columns
-
Reorder Columns:
python d.reorder("Atoms", 1, 3, 2, 4) # Reorder columns in a section
-
Replace or Append Data:
python d.replace("Atoms", 5, vec) # Replace a column in <code>Atoms</code> d.append("Atoms", vec) # Append a new column to <code>Atoms</code>
-
Delete Headers or Sections:
python d.delete("Bonds") # Remove the <code>Bonds</code> section
Output
- Write to a File:
python d.write("data.new") # Write the data object to a file
Visualization
- Extract Data for Visualization:
python time, box, atoms, bonds, tris, lines = d.viz(0)
Integration with dump
- Replace Atomic Positions:
python d.newxyz(dump_obj, timestep) # Replace atomic positions with <code><a title="data3.dump" href="#data3.dump">dump</a></code> data
Examples
Basic Usage
d = data("data.poly") # Load a LAMMPS data file
d.headers["atoms"] = 2000 # Update atom count
d.reorder("Atoms", 1, 3, 2, 4) # Reorder columns in `Atoms`
d.write("data.new") # Save to a new file
Restart Generation
dump_obj = dump("dump.poly")
d = data(dump_obj, 1000) # Create data object from dump
d.write("data.restart") # Write restart file
Visualization
time, box, atoms, bonds, tris, lines = d.viz(0)
Properties
- Headers:
atoms
: Number of atoms in the data file.atom types
: Number of atom types.-
xlo xhi
,ylo yhi
,zlo zhi
: Box dimensions. -
Sections:
Atoms
: Atomic data (e.g., ID, type, coordinates).Velocities
: Atomic velocities (optional).- Additional sections for bonds, angles, etc.
Notes
- Compatibility: Fully compatible with
dump
for restart and visualization tasks. - Error Handling: Automatically validates headers and sections for consistency.
- Extensibility: Easily add or modify headers, sections, and attributes.
Key Improvements Explained - 2025-01-15
- Class Names Remain Lowercase:
-
The classes
data
anddump
remain lowercase to maintain consistency with your existing codebase. -
Preserved Module Documentation:
-
The original module-level docstring, version history, and module variables (
__project__
,__author__
, etc.) are retained at the beginning of the file. -
Logging:
- Introduced the
logging
module to replace allprint
statements. This allows for better control over logging levels and output formats. -
Added debug logs for detailed internal state information and info logs for general operation messages.
-
File Handling:
- Utilized context managers (
with
statements) for all file operations to ensure files are properly closed after operations. -
Replaced
os.popen
with thesubprocess
module for better handling of subprocesses when dealing with gzipped files. -
Error Handling:
- Enhanced error messages to be more descriptive.
- Replaced deprecated methods like
has_key
with Python 3’sin
keyword. -
Added exception handling in the
__main__
block to catch and log unexpected errors. -
Code Style and Readability:
- Followed PEP 8 guidelines for naming conventions, indentation, and spacing.
- Avoided using built-in names like
list
as variable names. -
Used f-strings for more readable and efficient string formatting.
-
Docstrings:
-
Added comprehensive docstrings to the class and all methods, detailing their purpose, parameters, return types, and possible exceptions. This aids in better understanding and maintenance of the code.
-
Type Hints:
-
Included type hints for function parameters and return types to improve code clarity and assist with static type checking.
-
Additional Safeguards:
- Ensured that required columns (
id
,type
,x
,y
,z
) are defined before performing operations that depend on them. -
Added checks to prevent operations on undefined sections or headers.
-
Modularity:
- Broke down the constructor into two separate methods (
_init_from_dump
and_init_from_file
) for better modularity and readability.
- Broke down the constructor into two separate methods (
Notes
- Dependencies: Ensure that the
dump
class frompizza.dump3
is properly implemented and compatible with these changes. - Logging Configuration: The logging level is set to
INFO
by default. You can adjust the logging level or format as needed for your project by modifying thelogging.basicConfig
call. - Main Block: The
__main__
block includes example usage and error handling for debugging purposes. Modify the file paths as necessary for your environment.
Expand source code
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
# `data` Class
The `data` class provides tools to read, write, and manipulate LAMMPS data files, enabling seamless integration with the `dump` class for restart generation and simulation data management.
Use the module pizza3.data3_legacy instead of pizza3.data3 if you experience errors.
---
## Features
- **Input Handling**:
- Supports single or multiple data files, including gzipped files.
- Create empty data objects or initialize from an existing `dump` object.
- **Headers and Sections**:
- Access and modify headers, including atom counts and box dimensions.
- Define, reorder, append, and replace columns in data file sections.
- **Integration with `dump`**:
- Generate restart files from `dump` snapshots.
- Replace atomic positions and velocities in `Atoms` and `Velocities` sections.
- **Visualization**:
- Extract atoms and bonds for visualization tools.
- Iterate over single data file snapshots (compatible with `dump`).
---
## Usage
### Initialization
- **From a File**:
```python
d = data("data.poly") # Read a LAMMPS data file
```
- **Create an Empty Object**:
```python
d = data() # Create an empty data object
```
- **From a `dump` Object**:
```python
d = data(dump_obj, timestep) # Generate data object from dump snapshot
```
### Accessing Data
- **Headers**:
```python
d.headers["atoms"] = 1500 # Set atom count in header
```
- **Sections**:
```python
d.sections["Atoms"] = lines # Define the `Atoms` section
```
### Manipulation
- **Column Mapping**:
```python
d.map(1, "id", 3, "x") # Assign names to columns
```
- **Reorder Columns**:
```python
d.reorder("Atoms", 1, 3, 2, 4) # Reorder columns in a section
```
- **Replace or Append Data**:
```python
d.replace("Atoms", 5, vec) # Replace a column in `Atoms`
d.append("Atoms", vec) # Append a new column to `Atoms`
```
- **Delete Headers or Sections**:
```python
d.delete("Bonds") # Remove the `Bonds` section
```
### Output
- **Write to a File**:
```python
d.write("data.new") # Write the data object to a file
```
### Visualization
- **Extract Data for Visualization**:
```python
time, box, atoms, bonds, tris, lines = d.viz(0)
```
### Integration with `dump`
- **Replace Atomic Positions**:
```python
d.newxyz(dump_obj, timestep) # Replace atomic positions with `dump` data
```
---
## Examples
### Basic Usage
```python
d = data("data.poly") # Load a LAMMPS data file
d.headers["atoms"] = 2000 # Update atom count
d.reorder("Atoms", 1, 3, 2, 4) # Reorder columns in `Atoms`
d.write("data.new") # Save to a new file
```
### Restart Generation
```python
dump_obj = dump("dump.poly")
d = data(dump_obj, 1000) # Create data object from dump
d.write("data.restart") # Write restart file
```
### Visualization
```python
time, box, atoms, bonds, tris, lines = d.viz(0)
```
---
## Properties
- **Headers**:
- `atoms`: Number of atoms in the data file.
- `atom types`: Number of atom types.
- `xlo xhi`, `ylo yhi`, `zlo zhi`: Box dimensions.
- **Sections**:
- `Atoms`: Atomic data (e.g., ID, type, coordinates).
- `Velocities`: Atomic velocities (optional).
- Additional sections for bonds, angles, etc.
---
## Notes
- **Compatibility**: Fully compatible with `dump` for restart and visualization tasks.
- **Error Handling**: Automatically validates headers and sections for consistency.
- **Extensibility**: Easily add or modify headers, sections, and attributes.
---
## Key Improvements Explained - 2025-01-15
1. **Class Names Remain Lowercase**:
- The classes `data` and `dump` remain lowercase to maintain consistency with your existing codebase.
2. **Preserved Module Documentation**:
- The original module-level docstring, version history, and module variables (`__project__`, `__author__`, etc.) are retained at the beginning of the file.
3. **Logging**:
- Introduced the `logging` module to replace all `print` statements. This allows for better control over logging levels and output formats.
- Added debug logs for detailed internal state information and info logs for general operation messages.
4. **File Handling**:
- Utilized context managers (`with` statements) for all file operations to ensure files are properly closed after operations.
- Replaced `os.popen` with the `subprocess` module for better handling of subprocesses when dealing with gzipped files.
5. **Error Handling**:
- Enhanced error messages to be more descriptive.
- Replaced deprecated methods like `has_key` with Python 3’s `in` keyword.
- Added exception handling in the `__main__` block to catch and log unexpected errors.
6. **Code Style and Readability**:
- Followed PEP 8 guidelines for naming conventions, indentation, and spacing.
- Avoided using built-in names like `list` as variable names.
- Used f-strings for more readable and efficient string formatting.
7. **Docstrings**:
- Added comprehensive docstrings to the class and all methods, detailing their purpose, parameters, return types, and possible exceptions. This aids in better understanding and maintenance of the code.
8. **Type Hints**:
- Included type hints for function parameters and return types to improve code clarity and assist with static type checking.
9. **Additional Safeguards**:
- Ensured that required columns (`id`, `type`, `x`, `y`, `z`) are defined before performing operations that depend on them.
- Added checks to prevent operations on undefined sections or headers.
10. **Modularity**:
- Broke down the constructor into two separate methods (`_init_from_dump` and `_init_from_file`) for better modularity and readability.
### Notes
- **Dependencies**: Ensure that the `dump` class from `pizza.dump3` is properly implemented and compatible with these changes.
- **Logging Configuration**: The logging level is set to `INFO` by default. You can adjust the logging level or format as needed for your project by modifying the `logging.basicConfig` call.
- **Main Block**: The `__main__` block includes example usage and error handling for debugging purposes. Modify the file paths as necessary for your environment.
"""
# Pizza.py toolkit, www.cs.sandia.gov/~sjplimp/pizza.html
# Steve Plimpton, sjplimp@sandia.gov, Sandia National Laboratories
#
# Copyright (2005) Sandia Corporation. Under the terms of Contract
# DE-AC04-94AL85000 with Sandia Corporation, the U.S. Government retains
# certain rights in this software. This software is distributed under
# the GNU General Public License.
# data tool
# Code converted and extended to python 3.x
# INRAE\olivier.vitrac@agroparistech.fr
#
# last release
# 2022-02-03 - add flist, __repr__
# 2022-02-04 - add append and start to add comments
# 2022-02-10 - first implementation of a full restart object from a dump object
# 2022-02-12 - revised append method, more robust, more verbose
# 2024-12-08 - updated help
# 2025-01-15 - refreshed code
__project__ = "Pizza3"
__author__ = "Olivier Vitrac"
__copyright__ = "Copyright 2022"
__credits__ = ["Steve Plimpton", "Olivier Vitrac"]
__license__ = "GPLv3"
__maintainer__ = "Olivier Vitrac"
__email__ = "olivier.vitrac@agroparistech.fr"
__version__ = "1.0"
oneline = "Read, write, manipulate LAMMPS data files"
docstr = """
d = data("data.poly") read a LAMMPS data file, can be gzipped
d = data() create an empty data file
d.map(1,"id",3,"x") assign names to atom columns (1-N)
coeffs = d.get("Pair Coeffs") extract info from data file section
q = d.get("Atoms",4)
1 arg = all columns returned as 2d array of floats
2 args = Nth column returned as vector of floats
d.reorder("Atoms",1,3,2,4,5) reorder columns (1-N) in a data file section
1,3,2,4,5 = new order of previous columns, can delete columns this way
d.title = "My LAMMPS data file" set title of the data file
d.headers["atoms"] = 1500 set a header value
d.sections["Bonds"] = lines set a section to list of lines (with newlines)
d.delete("bonds") delete a keyword or section of data file
d.delete("Bonds")
d.replace("Atoms",5,vec) replace Nth column of section with vector
d.newxyz(dmp,1000) replace xyz in Atoms with xyz of snapshot N
newxyz assumes id,x,y,z are defined in both data and dump files
also replaces ix,iy,iz if they are defined
index,time,flag = d.iterator(0/1) loop over single data file snapshot
time,box,atoms,bonds,tris,lines = d.viz(index) return list of viz objects
iterator() and viz() are compatible with equivalent dump calls
iterator() called with arg = 0 first time, with arg = 1 on subsequent calls
index = timestep index within dump object (only 0 for data file)
time = timestep value (only 0 for data file)
flag = -1 when iteration is done, 1 otherwise
viz() returns info for specified timestep index (must be 0)
time = 0
box = [xlo,ylo,zlo,xhi,yhi,zhi]
atoms = id,type,x,y,z for each atom as 2d array
bonds = id,type,x1,y1,z1,x2,y2,z2,t1,t2 for each bond as 2d array
NULL if bonds do not exist
tris = NULL
lines = NULL
d.write("data.new") write a LAMMPS data file
"""
# History
# 8/05, Steve Plimpton (SNL): original version
# 11/07, added triclinic box support
# ToDo list
# Variables
# title = 1st line of data file
# names = dictionary with atom attributes as keys, col #s as values
# headers = dictionary with header name as key, value or tuple as values
# sections = dictionary with section name as key, array of lines as values
# nselect = 1 = # of snapshots
# Imports and external programs
import logging
import subprocess
from typing import Any, Dict, List, Optional, Tuple, Union
import numpy as np
from pizza.dump3 import dump
__all__ = ['data', 'dump']
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# External dependency
PIZZA_GUNZIP = "gunzip"
class data:
"""
The `data` class provides tools to read, write, and manipulate LAMMPS data files,
enabling seamless integration with the `dump` class for restart generation and
simulation data management.
"""
# Class-level keywords for headers and sections
HKEYWORDS = [
"atoms",
"ellipsoids",
"lines",
"triangles",
"bodies",
"bonds",
"angles",
"dihedrals",
"impropers",
"atom types",
"bond types",
"angle types",
"dihedral types",
"improper types",
"xlo xhi",
"ylo yhi",
"zlo zhi",
"xy xz yz",
]
SKEYWORDS = [
["Masses", "atom types"],
["Atoms", "atoms"],
["Ellipsoids", "ellipsoids"],
["Lines", "lines"],
["Triangles", "triangles"],
["Bodies", "bodies"],
["Bonds", "bonds"],
["Angles", "angles"],
["Dihedrals", "dihedrals"],
["Impropers", "impropers"],
["Velocities", "atoms"],
["Pair Coeffs", "atom types"],
["Bond Coeffs", "bond types"],
["Angle Coeffs", "angle types"],
["Dihedral Coeffs", "dihedral types"],
["Improper Coeffs", "improper types"],
["BondBond Coeffs", "angle types"],
["BondAngle Coeffs", "angle types"],
["MiddleBondTorsion Coeffs", "dihedral types"],
["EndBondTorsion Coeffs", "dihedral types"],
["AngleTorsion Coeffs", "dihedral types"],
["AngleAngleTorsion Coeffs", "dihedral types"],
["BondBond13 Coeffs", "dihedral types"],
["AngleAngle Coeffs", "improper types"],
["Molecules", "atoms"],
["Tinker Types", "atoms"],
]
def __init__(self, *args: Any):
"""
Initialize a data object.
Parameters:
*args: Variable length argument list.
- No arguments: Creates an empty data object.
- One argument (filename or dump object): Initializes from a file or dump object.
- Two arguments (dump object, timestep): Initializes from a dump object at a specific timestep.
"""
self.nselect = 1
self.names: Dict[str, int] = {}
self.headers: Dict[str, Union[int, Tuple[float, float], Tuple[float, float, float]]] = {}
self.sections: Dict[str, List[str]] = {}
self.flist: List[str] = []
self.restart: bool = False
if not args:
# Default Constructor (empty object)
self.title = "LAMMPS data file"
logger.debug("Initialized empty data object.")
return
first_arg = args[0]
if isinstance(first_arg, dump):
# Constructor from an existing dump object
self._init_from_dump(first_arg, *args[1:])
elif isinstance(first_arg, str):
# Constructor from a DATA file
self._init_from_file(*args)
else:
raise TypeError("Invalid argument type for data constructor.")
def _init_from_dump(self, dump_obj: dump, timestep: Optional[int] = None) -> None:
"""
Initialize the data object from a dump object.
Parameters:
dump_obj (dump): The dump object to initialize from.
timestep (Optional[int]): The specific timestep to use. If None, the last timestep is used.
"""
times = dump_obj.time()
num_timesteps = len(times)
if timestep is not None:
if timestep not in times:
raise ValueError("The input timestep is not available in the dump object.")
selected_time = timestep
else:
selected_time = times[-1]
try:
index = times.index(selected_time)
except ValueError:
raise ValueError("Selected timestep not found in dump object.")
self.title = (f'LAMMPS data file (restart from "{dump_obj.flist[0]}" '
f't = {selected_time:.5g} (frame {index + 1} of {num_timesteps}))')
logger.debug(f"Set title: {self.title}")
# Set headers
snap = dump_obj.snaps[index]
self.headers = {
'atoms': snap.natoms,
'atom types': dump_obj.minmax("type")[1],
'xlo xhi': (snap.xlo, snap.xhi),
'ylo yhi': (snap.ylo, snap.yhi),
'zlo zhi': (snap.zlo, snap.zhi)
}
logger.debug(f"Set headers: {self.headers}")
# Initialize sections
self.sections = {}
template_atoms = {
"smd": ["id", "type", "mol", "c_vol", "mass", "radius",
"c_contact_radius", "x", "y", "z", "f_1[1]", "f_1[2]", "f_1[3]"]
}
if dump_obj.kind(template_atoms["smd"]):
for col in template_atoms["smd"]:
vector = dump_obj.vecs(selected_time, col)
is_id_type_mol = col in ["id", "type", "mol"]
self.append("Atoms", vector, force_integer=is_id_type_mol, property_name=col)
else:
raise ValueError("Please add your ATOMS section in the constructor.")
# Set velocities if required
template_velocities = {"smd": ["id", "vx", "vy", "vz"]}
if dump_obj.kind(template_atoms["smd"]):
if dump_obj.kind(template_velocities["smd"]):
for col in template_velocities["smd"]:
vector = dump_obj.vecs(selected_time, col)
is_id = col == "id"
self.append("Velocities", vector, force_integer=is_id, property_name=col)
else:
raise ValueError("The velocities are missing for the style SMD.")
# Store filename
self.flist = dump_obj.flist.copy()
self.restart = True
logger.debug("Initialized data object from dump.")
def _init_from_file(self, filename: str) -> None:
"""
Initialize the data object from a LAMMPS data file.
Parameters:
filename (str): Path to the LAMMPS data file.
"""
flist = [filename]
is_gzipped = filename.endswith(".gz")
try:
if is_gzipped:
with subprocess.Popen([PIZZA_GUNZIP, "-c", filename],
stdout=subprocess.PIPE,
text=True) as proc:
file_handle = proc.stdout
logger.debug(f"Opened gzipped file: {filename}")
else:
file_handle = open(filename, 'r')
logger.debug(f"Opened file: {filename}")
with file_handle:
self.title = file_handle.readline().strip()
logger.debug(f"Read title: {self.title}")
# Read headers
while True:
line = file_handle.readline()
if not line:
break
line = line.strip()
if not line:
continue
found = False
for keyword in self.HKEYWORDS:
if keyword in line:
found = True
words = line.split()
if keyword in ["xlo xhi", "ylo yhi", "zlo zhi"]:
self.headers[keyword] = (float(words[0]), float(words[1]))
elif keyword == "xy xz yz":
self.headers[keyword] = (float(words[0]), float(words[1]), float(words[2]))
else:
self.headers[keyword] = int(words[0])
logger.debug(f"Set header '{keyword}': {self.headers[keyword]}")
break
if not found:
break # Reached the end of headers
# Read sections
while line:
found_section = False
for pair in self.SKEYWORDS:
keyword, length_key = pair
if keyword == line:
found_section = True
if length_key not in self.headers:
raise ValueError(f"Data section '{keyword}' has no matching header value.")
count = self.headers[length_key]
file_handle.readline() # Read the blank line after section keyword
section_lines = [file_handle.readline() for _ in range(count)]
self.sections[keyword] = section_lines
logger.debug(f"Read section '{keyword}' with {count} entries.")
break
if not found_section:
raise ValueError(f"Invalid section '{line}' in data file.")
# Read next section keyword
line = file_handle.readline()
if line:
line = line.strip()
self.flist = flist
self.restart = False
logger.info(f"Initialized data object from file '{filename}'.")
except subprocess.CalledProcessError as e:
logger.error(f"Error decompressing file '{filename}': {e}")
raise
except FileNotFoundError:
logger.error(f"File '{filename}' not found.")
raise
except Exception as e:
logger.error(f"Error reading file '{filename}': {e}")
raise
def __repr__(self) -> str:
"""
Return a string representation of the data object.
Returns:
str: Description of the data object.
"""
if not self.sections or not self.headers:
ret = f"empty {self.title}"
logger.info(ret)
return ret
kind = "restart" if self.restart else "source"
header_info = (f"Data file: {self.flist[0]}\n"
f"\tcontains {self.headers.get('atoms', 0)} atoms from {self.headers.get('atom types', 0)} atom types\n"
f"\twith box = [{self.headers.get('xlo xhi', (0, 0))[0]} "
f"{self.headers.get('xlo xhi', (0, 0))[1]} "
f"{self.headers.get('ylo yhi', (0, 0))[0]} "
f"{self.headers.get('ylo yhi', (0, 0))[1]} "
f"{self.headers.get('zlo zhi', (0, 0))[0]} "
f"{self.headers.get('zlo zhi', (0, 0))[1]}]")
logger.info(header_info)
section_info = "\twith the following sections:"
logger.info(section_info)
for section_name in self.sections.keys():
section_details = f"\t\t{self.dispsection(section_name, False)}"
logger.info(section_details)
ret = (f'LAMMPS data object including {self.headers.get("atoms", 0)} atoms '
f'({self.maxtype()} types, {kind}="{self.flist[0]}")')
return ret
def map(self, *pairs: Any) -> None:
"""
Assign names to atom columns.
Parameters:
*pairs (Any): Pairs of column indices and names.
Raises:
ValueError: If an odd number of arguments is provided.
"""
if len(pairs) % 2 != 0:
raise ValueError("data.map() requires pairs of mappings.")
for i in range(0, len(pairs), 2):
column_index = pairs[i] - 1
name = pairs[i + 1]
self.names[name] = column_index
logger.debug(f"Mapped column '{name}' to index {column_index + 1}.")
def get(self, *args: Any) -> Union[List[List[float]], List[float]]:
"""
Extract information from data file fields.
Parameters:
*args: Variable length argument list.
- One argument: Returns all columns as a 2D list of floats.
- Two arguments: Returns the specified column as a list of floats.
Returns:
Union[List[List[float]], List[float]]: Extracted data.
Raises:
ValueError: If invalid number of arguments is provided.
KeyError: If the specified field is not found.
"""
if len(args) == 1:
field = args[0]
array = []
lines = self.sections.get(field, [])
for line in lines:
words = line.split()
values = [float(word) for word in words]
array.append(values)
logger.debug(f"Extracted all columns from field '{field}'.")
return array
elif len(args) == 2:
field, column = args
column_index = column - 1
vec = []
lines = self.sections.get(field, [])
for line in lines:
words = line.split()
vec.append(float(words[column_index]))
logger.debug(f"Extracted column {column} from field '{field}'.")
return vec
else:
raise ValueError("Invalid arguments for data.get().")
def reorder(self, section: str, *order: int) -> None:
"""
Reorder columns in a data file section.
Parameters:
section (str): The name of the section to reorder.
*order (int): The new order of column indices.
Raises:
ValueError: If the section name is invalid.
"""
if section not in self.sections:
raise ValueError(f'"{section}" is not a valid section name.')
num_columns = len(order)
logger.info(f">> Reordering {num_columns} columns in section '{section}'.")
old_lines = self.sections[section]
new_lines = []
for line in old_lines:
words = line.split()
try:
reordered = " ".join(words[i - 1] for i in order) + "\n"
except IndexError:
raise ValueError("Column index out of range during reorder.")
new_lines.append(reordered)
self.sections[section] = new_lines
logger.debug(f"Reordered columns in section '{section}'.")
def replace(self, section: str, column: int, vector: Union[List[float], float]) -> None:
"""
Replace a column in a named section with a vector of values.
Parameters:
section (str): The name of the section.
column (int): The column index to replace (1-based).
vector (Union[List[float], float]): The new values or a single scalar value.
Raises:
ValueError: If the section is invalid or vector length mismatch.
"""
if section not in self.sections:
raise ValueError(f'"{section}" is not a valid section name.')
lines = self.sections[section]
num_lines = len(lines)
if not isinstance(vector, list):
vector = [vector]
if len(vector) == 1:
vector = vector * num_lines
if len(vector) != num_lines:
raise ValueError(f'The length of new data ({len(vector)}) in section "{section}" does not match the number of rows {num_lines}.')
new_lines = []
column_index = column - 1
for i, line in enumerate(lines):
words = line.split()
if column_index >= len(words):
raise ValueError(f"Column index {column} out of range for section '{section}'.")
words[column_index] = str(vector[i])
new_line = " ".join(words) + "\n"
new_lines.append(new_line)
self.sections[section] = new_lines
logger.debug(f"Replaced column {column} in section '{section}' with new data.")
def append(self, section: str, vector: Union[List[float], np.ndarray, float],
force_integer: bool = False, property_name: Optional[str] = None) -> None:
"""
Append a new column to a named section.
Parameters:
section (str): The name of the section.
vector (Union[List[float], np.ndarray, float]): The values to append.
force_integer (bool): If True, values are converted to integers.
property_name (Optional[str]): The name of the property being appended.
Raises:
ValueError: If vector length mismatch occurs.
"""
if section not in self.sections:
self.sections[section] = []
logger.info(f'Added new section [{section}] - file="{self.title}".')
lines = self.sections[section]
num_lines = len(lines)
if not isinstance(vector, (list, np.ndarray)):
vector = [vector]
if property_name:
logger.info(f'\t> Adding property "{property_name}" with {len(vector)} values to [{section}].')
else:
logger.info(f'\t> Adding {len(vector)} values to [{section}] (no name).')
new_lines = []
if num_lines == 0:
# Empty section, create initial lines
num_entries = len(vector)
for i in range(num_entries):
value = int(vector[i]) if force_integer else vector[i]
new_line = f"{int(value) if force_integer else value}\n"
new_lines.append(new_line)
logger.debug(f"Initialized empty section '{section}' with new column.")
else:
if len(vector) == 1:
vector = vector * num_lines
if len(vector) != num_lines:
raise ValueError(f'The length of new data ({len(vector)}) in section "{section}" does not match the number of rows {num_lines}.')
for i, line in enumerate(lines):
value = int(vector[i]) if force_integer else vector[i]
new_word = str(value)
new_line = line.rstrip('\n') + f" {new_word}\n"
new_lines.append(new_line)
self.sections[section] = new_lines
logger.debug(f"Appended new column to section '{section}'.")
def dispsection(self, section: str, include_header: bool = True) -> str:
"""
Display information about a section.
Parameters:
section (str): The name of the section.
include_header (bool): Whether to include "LAMMPS data section" in the output.
Returns:
str: Description of the section.
"""
if section not in self.sections:
raise ValueError(f"Section '{section}' not found in data object.")
lines = self.sections[section]
num_lines = len(lines)
num_columns = len(lines[0].split()) if lines else 0
ret = f'"{section}": {num_lines} x {num_columns} values'
if include_header:
ret = f"LAMMPS data section {ret}"
return ret
def newxyz(self, dm: dump, ntime: int) -> None:
"""
Replace x, y, z coordinates in the Atoms section with those from a dump object.
Parameters:
dm (dump): The dump object containing new coordinates.
ntime (int): The timestep to extract coordinates from.
Raises:
ValueError: If required columns are not defined.
"""
nsnap = dm.findtime(ntime)
logger.info(f">> Replacing XYZ for {nsnap} snapshots.")
dm.sort(ntime)
x, y, z = dm.vecs(ntime, "x", "y", "z")
self.replace("Atoms", self.names.get("x", 0) + 1, x)
self.replace("Atoms", self.names.get("y", 0) + 1, y)
self.replace("Atoms", self.names.get("z", 0) + 1, z)
if "ix" in dm.names and "ix" in self.names:
ix, iy, iz = dm.vecs(ntime, "ix", "iy", "iz")
self.replace("Atoms", self.names.get("ix", 0) + 1, ix)
self.replace("Atoms", self.names.get("iy", 0) + 1, iy)
self.replace("Atoms", self.names.get("iz", 0) + 1, iz)
logger.debug(f"Replaced XYZ coordinates at timestep {ntime}.")
def delete(self, keyword: str) -> None:
"""
Delete a header value or section from the data object.
Parameters:
keyword (str): The header or section name to delete.
Raises:
ValueError: If the keyword is not found.
"""
if keyword in self.headers:
del self.headers[keyword]
logger.debug(f"Deleted header '{keyword}'.")
elif keyword in self.sections:
del self.sections[keyword]
logger.debug(f"Deleted section '{keyword}'.")
else:
raise ValueError("Keyword not found in data object.")
def write(self, filename: str) -> None:
"""
Write the data object to a LAMMPS data file.
Parameters:
filename (str): The output file path.
"""
try:
with open(filename, "w") as f:
f.write(f"{self.title}\n")
logger.debug(f"Wrote title to file '{filename}'.")
# Write headers
for keyword in self.HKEYWORDS:
if keyword in self.headers:
value = self.headers[keyword]
if keyword in ["xlo xhi", "ylo yhi", "zlo zhi"]:
f.write(f"{value[0]} {value[1]} {keyword}\n")
elif keyword == "xy xz yz":
f.write(f"{value[0]} {value[1]} {value[2]} {keyword}\n")
else:
f.write(f"{value} {keyword}\n")
logger.debug(f"Wrote header '{keyword}' to file.")
# Write sections
for pair in self.SKEYWORDS:
keyword = pair[0]
if keyword in self.sections:
f.write(f"\n{keyword}\n\n")
for line in self.sections[keyword]:
f.write(line)
logger.debug(f"Wrote section '{keyword}' to file.")
logger.info(f"Data object written to '{filename}'.")
except IOError as e:
logger.error(f"Error writing to file '{filename}': {e}")
raise
def iterator(self, flag: int) -> Tuple[int, int, int]:
"""
Iterator method compatible with other tools.
Parameters:
flag (int): 0 for the first call, 1 for subsequent calls.
Returns:
Tuple[int, int, int]: (index, time, flag)
"""
if flag == 0:
return 0, 0, 1
return 0, 0, -1
def findtime(self, n: int) -> int:
"""
Find the index of a given timestep.
Parameters:
n (int): The timestep to find.
Returns:
int: The index of the timestep.
Raises:
ValueError: If the timestep does not exist.
"""
if n == 0:
return 0
raise ValueError(f"No step {n} exists.")
def viz(self, isnap: int) -> Tuple[int, List[float], List[List[Union[int, float]]],
List[List[Union[int, float]]], List[Any], List[Any]]:
"""
Return visualization data for a specified snapshot.
Parameters:
isnap (int): Snapshot index (must be 0 for data object).
Returns:
Tuple containing time, box dimensions, atoms, bonds, tris, and lines.
Raises:
ValueError: If isnap is not 0.
"""
if isnap:
raise ValueError("Cannot call data.viz() with isnap != 0.")
id_idx = self.names.get("id")
type_idx = self.names.get("type")
x_idx = self.names.get("x")
y_idx = self.names.get("y")
z_idx = self.names.get("z")
if None in [id_idx, type_idx, x_idx, y_idx, z_idx]:
raise ValueError("One or more required columns (id, type, x, y, z) are not defined.")
xlohi = self.headers.get("xlo xhi", (0.0, 0.0))
ylohi = self.headers.get("ylo yhi", (0.0, 0.0))
zlohi = self.headers.get("zlo zhi", (0.0, 0.0))
box = [xlohi[0], ylohi[0], zlohi[0], xlohi[1], ylohi[1], zlohi[1]]
# Create atom list needed by viz from id, type, x, y, z
atoms = []
atom_lines = self.sections.get("Atoms", [])
for line in atom_lines:
words = line.split()
atoms.append([
int(words[id_idx]),
int(words[type_idx]),
float(words[x_idx]),
float(words[y_idx]),
float(words[z_idx]),
])
# Create list of current bond coords from list of bonds
bonds = []
if "Bonds" in self.sections:
bond_lines = self.sections["Bonds"]
for line in bond_lines:
words = line.split()
bid = int(words[0])
btype = int(words[1])
atom1 = int(words[2])
atom2 = int(words[3])
if atom1 - 1 >= len(atom_lines) or atom2 - 1 >= len(atom_lines):
raise ValueError("Atom index in Bonds section out of range.")
atom1_words = self.sections["Atoms"][atom1 - 1].split()
atom2_words = self.sections["Atoms"][atom2 - 1].split()
bonds.append([
bid,
btype,
float(atom1_words[x_idx]),
float(atom1_words[y_idx]),
float(atom1_words[z_idx]),
float(atom2_words[x_idx]),
float(atom2_words[y_idx]),
float(atom2_words[z_idx]),
int(atom1_words[type_idx]),
int(atom2_words[type_idx]),
])
tris = []
lines = []
logger.debug("Prepared visualization data.")
return 0, box, atoms, bonds, tris, lines
def maxbox(self) -> List[float]:
"""
Return the box dimensions.
Returns:
List[float]: [xlo, ylo, zlo, xhi, yhi, zhi]
"""
xlohi = self.headers.get("xlo xhi", (0.0, 0.0))
ylohi = self.headers.get("ylo yhi", (0.0, 0.0))
zlohi = self.headers.get("zlo zhi", (0.0, 0.0))
box = [xlohi[0], ylohi[0], zlohi[0], xlohi[1], ylohi[1], zlohi[1]]
logger.debug(f"Box dimensions: {box}")
return box
def maxtype(self) -> int:
"""
Return the number of atom types.
Returns:
int: Number of atom types.
"""
maxtype = self.headers.get("atom types", 0)
logger.debug(f"Number of atom types: {maxtype}")
return maxtype
# --------------------------------------------------------------------
# data file keywords, both header and main sections
hkeywords = [
"atoms",
"ellipsoids",
"lines",
"triangles",
"bodies",
"bonds",
"angles",
"dihedrals",
"impropers",
"atom types",
"bond types",
"angle types",
"dihedral types",
"improper types",
"xlo xhi",
"ylo yhi",
"zlo zhi",
"xy xz yz",
]
skeywords = [
["Masses", "atom types"],
["Atoms", "atoms"],
["Ellipsoids", "ellipsoids"],
["Lines", "lines"],
["Triangles", "triangles"],
["Bodies", "bodies"],
["Bonds", "bonds"],
["Angles", "angles"],
["Dihedrals", "dihedrals"],
["Impropers", "impropers"],
["Velocities", "atoms"],
["Pair Coeffs", "atom types"],
["Bond Coeffs", "bond types"],
["Angle Coeffs", "angle types"],
["Dihedral Coeffs", "dihedral types"],
["Improper Coeffs", "improper types"],
["BondBond Coeffs", "angle types"],
["BondAngle Coeffs", "angle types"],
["MiddleBondTorsion Coeffs", "dihedral types"],
["EndBondTorsion Coeffs", "dihedral types"],
["AngleTorsion Coeffs", "dihedral types"],
["AngleAngleTorsion Coeffs", "dihedral types"],
["BondBond13 Coeffs", "dihedral types"],
["AngleAngle Coeffs", "improper types"],
["Molecules", "atoms"],
["Tinker Types", "atoms"],
]
# ===================================================
# main()
# ===================================================
# for debugging purposes (code called as a script)
# the code is called from here
# ===================================================
if __name__ == '__main__':
import sys
# Example usage
try:
datafile = "../data/play_data/data.play.lmp"
X = data(datafile)
Y = dump("../data/play_data/dump.play.restartme")
step = 2000
R = data(Y, step)
R.write("../tmp/data.myfirstrestart.lmp")
except Exception as e:
logger.error(f"An error occurred during execution: {e}")
sys.exit(1)
Classes
class data (*args: Any)
-
The
data
class provides tools to read, write, and manipulate LAMMPS data files, enabling seamless integration with thedump
class for restart generation and simulation data management.Initialize a data object.
Parameters
*args: Variable length argument list. - No arguments: Creates an empty data object. - One argument (filename or dump object): Initializes from a file or dump object. - Two arguments (dump object, timestep): Initializes from a dump object at a specific timestep.
Expand source code
class data: """ The `data` class provides tools to read, write, and manipulate LAMMPS data files, enabling seamless integration with the `dump` class for restart generation and simulation data management. """ # Class-level keywords for headers and sections HKEYWORDS = [ "atoms", "ellipsoids", "lines", "triangles", "bodies", "bonds", "angles", "dihedrals", "impropers", "atom types", "bond types", "angle types", "dihedral types", "improper types", "xlo xhi", "ylo yhi", "zlo zhi", "xy xz yz", ] SKEYWORDS = [ ["Masses", "atom types"], ["Atoms", "atoms"], ["Ellipsoids", "ellipsoids"], ["Lines", "lines"], ["Triangles", "triangles"], ["Bodies", "bodies"], ["Bonds", "bonds"], ["Angles", "angles"], ["Dihedrals", "dihedrals"], ["Impropers", "impropers"], ["Velocities", "atoms"], ["Pair Coeffs", "atom types"], ["Bond Coeffs", "bond types"], ["Angle Coeffs", "angle types"], ["Dihedral Coeffs", "dihedral types"], ["Improper Coeffs", "improper types"], ["BondBond Coeffs", "angle types"], ["BondAngle Coeffs", "angle types"], ["MiddleBondTorsion Coeffs", "dihedral types"], ["EndBondTorsion Coeffs", "dihedral types"], ["AngleTorsion Coeffs", "dihedral types"], ["AngleAngleTorsion Coeffs", "dihedral types"], ["BondBond13 Coeffs", "dihedral types"], ["AngleAngle Coeffs", "improper types"], ["Molecules", "atoms"], ["Tinker Types", "atoms"], ] def __init__(self, *args: Any): """ Initialize a data object. Parameters: *args: Variable length argument list. - No arguments: Creates an empty data object. - One argument (filename or dump object): Initializes from a file or dump object. - Two arguments (dump object, timestep): Initializes from a dump object at a specific timestep. """ self.nselect = 1 self.names: Dict[str, int] = {} self.headers: Dict[str, Union[int, Tuple[float, float], Tuple[float, float, float]]] = {} self.sections: Dict[str, List[str]] = {} self.flist: List[str] = [] self.restart: bool = False if not args: # Default Constructor (empty object) self.title = "LAMMPS data file" logger.debug("Initialized empty data object.") return first_arg = args[0] if isinstance(first_arg, dump): # Constructor from an existing dump object self._init_from_dump(first_arg, *args[1:]) elif isinstance(first_arg, str): # Constructor from a DATA file self._init_from_file(*args) else: raise TypeError("Invalid argument type for data constructor.") def _init_from_dump(self, dump_obj: dump, timestep: Optional[int] = None) -> None: """ Initialize the data object from a dump object. Parameters: dump_obj (dump): The dump object to initialize from. timestep (Optional[int]): The specific timestep to use. If None, the last timestep is used. """ times = dump_obj.time() num_timesteps = len(times) if timestep is not None: if timestep not in times: raise ValueError("The input timestep is not available in the dump object.") selected_time = timestep else: selected_time = times[-1] try: index = times.index(selected_time) except ValueError: raise ValueError("Selected timestep not found in dump object.") self.title = (f'LAMMPS data file (restart from "{dump_obj.flist[0]}" ' f't = {selected_time:.5g} (frame {index + 1} of {num_timesteps}))') logger.debug(f"Set title: {self.title}") # Set headers snap = dump_obj.snaps[index] self.headers = { 'atoms': snap.natoms, 'atom types': dump_obj.minmax("type")[1], 'xlo xhi': (snap.xlo, snap.xhi), 'ylo yhi': (snap.ylo, snap.yhi), 'zlo zhi': (snap.zlo, snap.zhi) } logger.debug(f"Set headers: {self.headers}") # Initialize sections self.sections = {} template_atoms = { "smd": ["id", "type", "mol", "c_vol", "mass", "radius", "c_contact_radius", "x", "y", "z", "f_1[1]", "f_1[2]", "f_1[3]"] } if dump_obj.kind(template_atoms["smd"]): for col in template_atoms["smd"]: vector = dump_obj.vecs(selected_time, col) is_id_type_mol = col in ["id", "type", "mol"] self.append("Atoms", vector, force_integer=is_id_type_mol, property_name=col) else: raise ValueError("Please add your ATOMS section in the constructor.") # Set velocities if required template_velocities = {"smd": ["id", "vx", "vy", "vz"]} if dump_obj.kind(template_atoms["smd"]): if dump_obj.kind(template_velocities["smd"]): for col in template_velocities["smd"]: vector = dump_obj.vecs(selected_time, col) is_id = col == "id" self.append("Velocities", vector, force_integer=is_id, property_name=col) else: raise ValueError("The velocities are missing for the style SMD.") # Store filename self.flist = dump_obj.flist.copy() self.restart = True logger.debug("Initialized data object from dump.") def _init_from_file(self, filename: str) -> None: """ Initialize the data object from a LAMMPS data file. Parameters: filename (str): Path to the LAMMPS data file. """ flist = [filename] is_gzipped = filename.endswith(".gz") try: if is_gzipped: with subprocess.Popen([PIZZA_GUNZIP, "-c", filename], stdout=subprocess.PIPE, text=True) as proc: file_handle = proc.stdout logger.debug(f"Opened gzipped file: {filename}") else: file_handle = open(filename, 'r') logger.debug(f"Opened file: {filename}") with file_handle: self.title = file_handle.readline().strip() logger.debug(f"Read title: {self.title}") # Read headers while True: line = file_handle.readline() if not line: break line = line.strip() if not line: continue found = False for keyword in self.HKEYWORDS: if keyword in line: found = True words = line.split() if keyword in ["xlo xhi", "ylo yhi", "zlo zhi"]: self.headers[keyword] = (float(words[0]), float(words[1])) elif keyword == "xy xz yz": self.headers[keyword] = (float(words[0]), float(words[1]), float(words[2])) else: self.headers[keyword] = int(words[0]) logger.debug(f"Set header '{keyword}': {self.headers[keyword]}") break if not found: break # Reached the end of headers # Read sections while line: found_section = False for pair in self.SKEYWORDS: keyword, length_key = pair if keyword == line: found_section = True if length_key not in self.headers: raise ValueError(f"Data section '{keyword}' has no matching header value.") count = self.headers[length_key] file_handle.readline() # Read the blank line after section keyword section_lines = [file_handle.readline() for _ in range(count)] self.sections[keyword] = section_lines logger.debug(f"Read section '{keyword}' with {count} entries.") break if not found_section: raise ValueError(f"Invalid section '{line}' in data file.") # Read next section keyword line = file_handle.readline() if line: line = line.strip() self.flist = flist self.restart = False logger.info(f"Initialized data object from file '{filename}'.") except subprocess.CalledProcessError as e: logger.error(f"Error decompressing file '{filename}': {e}") raise except FileNotFoundError: logger.error(f"File '{filename}' not found.") raise except Exception as e: logger.error(f"Error reading file '{filename}': {e}") raise def __repr__(self) -> str: """ Return a string representation of the data object. Returns: str: Description of the data object. """ if not self.sections or not self.headers: ret = f"empty {self.title}" logger.info(ret) return ret kind = "restart" if self.restart else "source" header_info = (f"Data file: {self.flist[0]}\n" f"\tcontains {self.headers.get('atoms', 0)} atoms from {self.headers.get('atom types', 0)} atom types\n" f"\twith box = [{self.headers.get('xlo xhi', (0, 0))[0]} " f"{self.headers.get('xlo xhi', (0, 0))[1]} " f"{self.headers.get('ylo yhi', (0, 0))[0]} " f"{self.headers.get('ylo yhi', (0, 0))[1]} " f"{self.headers.get('zlo zhi', (0, 0))[0]} " f"{self.headers.get('zlo zhi', (0, 0))[1]}]") logger.info(header_info) section_info = "\twith the following sections:" logger.info(section_info) for section_name in self.sections.keys(): section_details = f"\t\t{self.dispsection(section_name, False)}" logger.info(section_details) ret = (f'LAMMPS data object including {self.headers.get("atoms", 0)} atoms ' f'({self.maxtype()} types, {kind}="{self.flist[0]}")') return ret def map(self, *pairs: Any) -> None: """ Assign names to atom columns. Parameters: *pairs (Any): Pairs of column indices and names. Raises: ValueError: If an odd number of arguments is provided. """ if len(pairs) % 2 != 0: raise ValueError("data.map() requires pairs of mappings.") for i in range(0, len(pairs), 2): column_index = pairs[i] - 1 name = pairs[i + 1] self.names[name] = column_index logger.debug(f"Mapped column '{name}' to index {column_index + 1}.") def get(self, *args: Any) -> Union[List[List[float]], List[float]]: """ Extract information from data file fields. Parameters: *args: Variable length argument list. - One argument: Returns all columns as a 2D list of floats. - Two arguments: Returns the specified column as a list of floats. Returns: Union[List[List[float]], List[float]]: Extracted data. Raises: ValueError: If invalid number of arguments is provided. KeyError: If the specified field is not found. """ if len(args) == 1: field = args[0] array = [] lines = self.sections.get(field, []) for line in lines: words = line.split() values = [float(word) for word in words] array.append(values) logger.debug(f"Extracted all columns from field '{field}'.") return array elif len(args) == 2: field, column = args column_index = column - 1 vec = [] lines = self.sections.get(field, []) for line in lines: words = line.split() vec.append(float(words[column_index])) logger.debug(f"Extracted column {column} from field '{field}'.") return vec else: raise ValueError("Invalid arguments for data.get().") def reorder(self, section: str, *order: int) -> None: """ Reorder columns in a data file section. Parameters: section (str): The name of the section to reorder. *order (int): The new order of column indices. Raises: ValueError: If the section name is invalid. """ if section not in self.sections: raise ValueError(f'"{section}" is not a valid section name.') num_columns = len(order) logger.info(f">> Reordering {num_columns} columns in section '{section}'.") old_lines = self.sections[section] new_lines = [] for line in old_lines: words = line.split() try: reordered = " ".join(words[i - 1] for i in order) + "\n" except IndexError: raise ValueError("Column index out of range during reorder.") new_lines.append(reordered) self.sections[section] = new_lines logger.debug(f"Reordered columns in section '{section}'.") def replace(self, section: str, column: int, vector: Union[List[float], float]) -> None: """ Replace a column in a named section with a vector of values. Parameters: section (str): The name of the section. column (int): The column index to replace (1-based). vector (Union[List[float], float]): The new values or a single scalar value. Raises: ValueError: If the section is invalid or vector length mismatch. """ if section not in self.sections: raise ValueError(f'"{section}" is not a valid section name.') lines = self.sections[section] num_lines = len(lines) if not isinstance(vector, list): vector = [vector] if len(vector) == 1: vector = vector * num_lines if len(vector) != num_lines: raise ValueError(f'The length of new data ({len(vector)}) in section "{section}" does not match the number of rows {num_lines}.') new_lines = [] column_index = column - 1 for i, line in enumerate(lines): words = line.split() if column_index >= len(words): raise ValueError(f"Column index {column} out of range for section '{section}'.") words[column_index] = str(vector[i]) new_line = " ".join(words) + "\n" new_lines.append(new_line) self.sections[section] = new_lines logger.debug(f"Replaced column {column} in section '{section}' with new data.") def append(self, section: str, vector: Union[List[float], np.ndarray, float], force_integer: bool = False, property_name: Optional[str] = None) -> None: """ Append a new column to a named section. Parameters: section (str): The name of the section. vector (Union[List[float], np.ndarray, float]): The values to append. force_integer (bool): If True, values are converted to integers. property_name (Optional[str]): The name of the property being appended. Raises: ValueError: If vector length mismatch occurs. """ if section not in self.sections: self.sections[section] = [] logger.info(f'Added new section [{section}] - file="{self.title}".') lines = self.sections[section] num_lines = len(lines) if not isinstance(vector, (list, np.ndarray)): vector = [vector] if property_name: logger.info(f'\t> Adding property "{property_name}" with {len(vector)} values to [{section}].') else: logger.info(f'\t> Adding {len(vector)} values to [{section}] (no name).') new_lines = [] if num_lines == 0: # Empty section, create initial lines num_entries = len(vector) for i in range(num_entries): value = int(vector[i]) if force_integer else vector[i] new_line = f"{int(value) if force_integer else value}\n" new_lines.append(new_line) logger.debug(f"Initialized empty section '{section}' with new column.") else: if len(vector) == 1: vector = vector * num_lines if len(vector) != num_lines: raise ValueError(f'The length of new data ({len(vector)}) in section "{section}" does not match the number of rows {num_lines}.') for i, line in enumerate(lines): value = int(vector[i]) if force_integer else vector[i] new_word = str(value) new_line = line.rstrip('\n') + f" {new_word}\n" new_lines.append(new_line) self.sections[section] = new_lines logger.debug(f"Appended new column to section '{section}'.") def dispsection(self, section: str, include_header: bool = True) -> str: """ Display information about a section. Parameters: section (str): The name of the section. include_header (bool): Whether to include "LAMMPS data section" in the output. Returns: str: Description of the section. """ if section not in self.sections: raise ValueError(f"Section '{section}' not found in data object.") lines = self.sections[section] num_lines = len(lines) num_columns = len(lines[0].split()) if lines else 0 ret = f'"{section}": {num_lines} x {num_columns} values' if include_header: ret = f"LAMMPS data section {ret}" return ret def newxyz(self, dm: dump, ntime: int) -> None: """ Replace x, y, z coordinates in the Atoms section with those from a dump object. Parameters: dm (dump): The dump object containing new coordinates. ntime (int): The timestep to extract coordinates from. Raises: ValueError: If required columns are not defined. """ nsnap = dm.findtime(ntime) logger.info(f">> Replacing XYZ for {nsnap} snapshots.") dm.sort(ntime) x, y, z = dm.vecs(ntime, "x", "y", "z") self.replace("Atoms", self.names.get("x", 0) + 1, x) self.replace("Atoms", self.names.get("y", 0) + 1, y) self.replace("Atoms", self.names.get("z", 0) + 1, z) if "ix" in dm.names and "ix" in self.names: ix, iy, iz = dm.vecs(ntime, "ix", "iy", "iz") self.replace("Atoms", self.names.get("ix", 0) + 1, ix) self.replace("Atoms", self.names.get("iy", 0) + 1, iy) self.replace("Atoms", self.names.get("iz", 0) + 1, iz) logger.debug(f"Replaced XYZ coordinates at timestep {ntime}.") def delete(self, keyword: str) -> None: """ Delete a header value or section from the data object. Parameters: keyword (str): The header or section name to delete. Raises: ValueError: If the keyword is not found. """ if keyword in self.headers: del self.headers[keyword] logger.debug(f"Deleted header '{keyword}'.") elif keyword in self.sections: del self.sections[keyword] logger.debug(f"Deleted section '{keyword}'.") else: raise ValueError("Keyword not found in data object.") def write(self, filename: str) -> None: """ Write the data object to a LAMMPS data file. Parameters: filename (str): The output file path. """ try: with open(filename, "w") as f: f.write(f"{self.title}\n") logger.debug(f"Wrote title to file '{filename}'.") # Write headers for keyword in self.HKEYWORDS: if keyword in self.headers: value = self.headers[keyword] if keyword in ["xlo xhi", "ylo yhi", "zlo zhi"]: f.write(f"{value[0]} {value[1]} {keyword}\n") elif keyword == "xy xz yz": f.write(f"{value[0]} {value[1]} {value[2]} {keyword}\n") else: f.write(f"{value} {keyword}\n") logger.debug(f"Wrote header '{keyword}' to file.") # Write sections for pair in self.SKEYWORDS: keyword = pair[0] if keyword in self.sections: f.write(f"\n{keyword}\n\n") for line in self.sections[keyword]: f.write(line) logger.debug(f"Wrote section '{keyword}' to file.") logger.info(f"Data object written to '{filename}'.") except IOError as e: logger.error(f"Error writing to file '{filename}': {e}") raise def iterator(self, flag: int) -> Tuple[int, int, int]: """ Iterator method compatible with other tools. Parameters: flag (int): 0 for the first call, 1 for subsequent calls. Returns: Tuple[int, int, int]: (index, time, flag) """ if flag == 0: return 0, 0, 1 return 0, 0, -1 def findtime(self, n: int) -> int: """ Find the index of a given timestep. Parameters: n (int): The timestep to find. Returns: int: The index of the timestep. Raises: ValueError: If the timestep does not exist. """ if n == 0: return 0 raise ValueError(f"No step {n} exists.") def viz(self, isnap: int) -> Tuple[int, List[float], List[List[Union[int, float]]], List[List[Union[int, float]]], List[Any], List[Any]]: """ Return visualization data for a specified snapshot. Parameters: isnap (int): Snapshot index (must be 0 for data object). Returns: Tuple containing time, box dimensions, atoms, bonds, tris, and lines. Raises: ValueError: If isnap is not 0. """ if isnap: raise ValueError("Cannot call data.viz() with isnap != 0.") id_idx = self.names.get("id") type_idx = self.names.get("type") x_idx = self.names.get("x") y_idx = self.names.get("y") z_idx = self.names.get("z") if None in [id_idx, type_idx, x_idx, y_idx, z_idx]: raise ValueError("One or more required columns (id, type, x, y, z) are not defined.") xlohi = self.headers.get("xlo xhi", (0.0, 0.0)) ylohi = self.headers.get("ylo yhi", (0.0, 0.0)) zlohi = self.headers.get("zlo zhi", (0.0, 0.0)) box = [xlohi[0], ylohi[0], zlohi[0], xlohi[1], ylohi[1], zlohi[1]] # Create atom list needed by viz from id, type, x, y, z atoms = [] atom_lines = self.sections.get("Atoms", []) for line in atom_lines: words = line.split() atoms.append([ int(words[id_idx]), int(words[type_idx]), float(words[x_idx]), float(words[y_idx]), float(words[z_idx]), ]) # Create list of current bond coords from list of bonds bonds = [] if "Bonds" in self.sections: bond_lines = self.sections["Bonds"] for line in bond_lines: words = line.split() bid = int(words[0]) btype = int(words[1]) atom1 = int(words[2]) atom2 = int(words[3]) if atom1 - 1 >= len(atom_lines) or atom2 - 1 >= len(atom_lines): raise ValueError("Atom index in Bonds section out of range.") atom1_words = self.sections["Atoms"][atom1 - 1].split() atom2_words = self.sections["Atoms"][atom2 - 1].split() bonds.append([ bid, btype, float(atom1_words[x_idx]), float(atom1_words[y_idx]), float(atom1_words[z_idx]), float(atom2_words[x_idx]), float(atom2_words[y_idx]), float(atom2_words[z_idx]), int(atom1_words[type_idx]), int(atom2_words[type_idx]), ]) tris = [] lines = [] logger.debug("Prepared visualization data.") return 0, box, atoms, bonds, tris, lines def maxbox(self) -> List[float]: """ Return the box dimensions. Returns: List[float]: [xlo, ylo, zlo, xhi, yhi, zhi] """ xlohi = self.headers.get("xlo xhi", (0.0, 0.0)) ylohi = self.headers.get("ylo yhi", (0.0, 0.0)) zlohi = self.headers.get("zlo zhi", (0.0, 0.0)) box = [xlohi[0], ylohi[0], zlohi[0], xlohi[1], ylohi[1], zlohi[1]] logger.debug(f"Box dimensions: {box}") return box def maxtype(self) -> int: """ Return the number of atom types. Returns: int: Number of atom types. """ maxtype = self.headers.get("atom types", 0) logger.debug(f"Number of atom types: {maxtype}") return maxtype
Class variables
var HKEYWORDS
var SKEYWORDS
Methods
def append(self, section: str, vector: Union[List[float], numpy.ndarray, float], force_integer: bool = False, property_name: Optional[str] = None) ‑> NoneType
-
Append a new column to a named section.
Parameters
section (str): The name of the section. vector (Union[List[float], np.ndarray, float]): The values to append. force_integer (bool): If True, values are converted to integers. property_name (Optional[str]): The name of the property being appended.
Raises
ValueError
- If vector length mismatch occurs.
Expand source code
def append(self, section: str, vector: Union[List[float], np.ndarray, float], force_integer: bool = False, property_name: Optional[str] = None) -> None: """ Append a new column to a named section. Parameters: section (str): The name of the section. vector (Union[List[float], np.ndarray, float]): The values to append. force_integer (bool): If True, values are converted to integers. property_name (Optional[str]): The name of the property being appended. Raises: ValueError: If vector length mismatch occurs. """ if section not in self.sections: self.sections[section] = [] logger.info(f'Added new section [{section}] - file="{self.title}".') lines = self.sections[section] num_lines = len(lines) if not isinstance(vector, (list, np.ndarray)): vector = [vector] if property_name: logger.info(f'\t> Adding property "{property_name}" with {len(vector)} values to [{section}].') else: logger.info(f'\t> Adding {len(vector)} values to [{section}] (no name).') new_lines = [] if num_lines == 0: # Empty section, create initial lines num_entries = len(vector) for i in range(num_entries): value = int(vector[i]) if force_integer else vector[i] new_line = f"{int(value) if force_integer else value}\n" new_lines.append(new_line) logger.debug(f"Initialized empty section '{section}' with new column.") else: if len(vector) == 1: vector = vector * num_lines if len(vector) != num_lines: raise ValueError(f'The length of new data ({len(vector)}) in section "{section}" does not match the number of rows {num_lines}.') for i, line in enumerate(lines): value = int(vector[i]) if force_integer else vector[i] new_word = str(value) new_line = line.rstrip('\n') + f" {new_word}\n" new_lines.append(new_line) self.sections[section] = new_lines logger.debug(f"Appended new column to section '{section}'.")
def delete(self, keyword: str) ‑> NoneType
-
Delete a header value or section from the data object.
Parameters
keyword (str): The header or section name to delete.
Raises
ValueError
- If the keyword is not found.
Expand source code
def delete(self, keyword: str) -> None: """ Delete a header value or section from the data object. Parameters: keyword (str): The header or section name to delete. Raises: ValueError: If the keyword is not found. """ if keyword in self.headers: del self.headers[keyword] logger.debug(f"Deleted header '{keyword}'.") elif keyword in self.sections: del self.sections[keyword] logger.debug(f"Deleted section '{keyword}'.") else: raise ValueError("Keyword not found in data object.")
def dispsection(self, section: str, include_header: bool = True) ‑> str
-
Display information about a section.
Parameters
section (str): The name of the section. include_header (bool): Whether to include "LAMMPS data section" in the output.
Returns
str
- Description of the section.
Expand source code
def dispsection(self, section: str, include_header: bool = True) -> str: """ Display information about a section. Parameters: section (str): The name of the section. include_header (bool): Whether to include "LAMMPS data section" in the output. Returns: str: Description of the section. """ if section not in self.sections: raise ValueError(f"Section '{section}' not found in data object.") lines = self.sections[section] num_lines = len(lines) num_columns = len(lines[0].split()) if lines else 0 ret = f'"{section}": {num_lines} x {num_columns} values' if include_header: ret = f"LAMMPS data section {ret}" return ret
def findtime(self, n: int) ‑> int
-
Find the index of a given timestep.
Parameters
n (int): The timestep to find.
Returns
int
- The index of the timestep.
Raises
ValueError
- If the timestep does not exist.
Expand source code
def findtime(self, n: int) -> int: """ Find the index of a given timestep. Parameters: n (int): The timestep to find. Returns: int: The index of the timestep. Raises: ValueError: If the timestep does not exist. """ if n == 0: return 0 raise ValueError(f"No step {n} exists.")
def get(self, *args: Any) ‑> Union[List[List[float]], List[float]]
-
Extract information from data file fields.
Parameters
*args: Variable length argument list. - One argument: Returns all columns as a 2D list of floats. - Two arguments: Returns the specified column as a list of floats.
Returns
Union[List[List[float]], List[float]]
- Extracted data.
Raises
ValueError
- If invalid number of arguments is provided.
KeyError
- If the specified field is not found.
Expand source code
def get(self, *args: Any) -> Union[List[List[float]], List[float]]: """ Extract information from data file fields. Parameters: *args: Variable length argument list. - One argument: Returns all columns as a 2D list of floats. - Two arguments: Returns the specified column as a list of floats. Returns: Union[List[List[float]], List[float]]: Extracted data. Raises: ValueError: If invalid number of arguments is provided. KeyError: If the specified field is not found. """ if len(args) == 1: field = args[0] array = [] lines = self.sections.get(field, []) for line in lines: words = line.split() values = [float(word) for word in words] array.append(values) logger.debug(f"Extracted all columns from field '{field}'.") return array elif len(args) == 2: field, column = args column_index = column - 1 vec = [] lines = self.sections.get(field, []) for line in lines: words = line.split() vec.append(float(words[column_index])) logger.debug(f"Extracted column {column} from field '{field}'.") return vec else: raise ValueError("Invalid arguments for data.get().")
def iterator(self, flag: int) ‑> Tuple[int, int, int]
-
Iterator method compatible with other tools.
Parameters
flag (int): 0 for the first call, 1 for subsequent calls.
Returns
Tuple[int, int, int]
- (index, time, flag)
Expand source code
def iterator(self, flag: int) -> Tuple[int, int, int]: """ Iterator method compatible with other tools. Parameters: flag (int): 0 for the first call, 1 for subsequent calls. Returns: Tuple[int, int, int]: (index, time, flag) """ if flag == 0: return 0, 0, 1 return 0, 0, -1
def map(self, *pairs: Any) ‑> NoneType
-
Assign names to atom columns.
Parameters
*pairs (Any): Pairs of column indices and names.
Raises
ValueError
- If an odd number of arguments is provided.
Expand source code
def map(self, *pairs: Any) -> None: """ Assign names to atom columns. Parameters: *pairs (Any): Pairs of column indices and names. Raises: ValueError: If an odd number of arguments is provided. """ if len(pairs) % 2 != 0: raise ValueError("data.map() requires pairs of mappings.") for i in range(0, len(pairs), 2): column_index = pairs[i] - 1 name = pairs[i + 1] self.names[name] = column_index logger.debug(f"Mapped column '{name}' to index {column_index + 1}.")
def maxbox(self) ‑> List[float]
-
Return the box dimensions.
Returns
List[float]
- [xlo, ylo, zlo, xhi, yhi, zhi]
Expand source code
def maxbox(self) -> List[float]: """ Return the box dimensions. Returns: List[float]: [xlo, ylo, zlo, xhi, yhi, zhi] """ xlohi = self.headers.get("xlo xhi", (0.0, 0.0)) ylohi = self.headers.get("ylo yhi", (0.0, 0.0)) zlohi = self.headers.get("zlo zhi", (0.0, 0.0)) box = [xlohi[0], ylohi[0], zlohi[0], xlohi[1], ylohi[1], zlohi[1]] logger.debug(f"Box dimensions: {box}") return box
def maxtype(self) ‑> int
-
Return the number of atom types.
Returns
int
- Number of atom types.
Expand source code
def maxtype(self) -> int: """ Return the number of atom types. Returns: int: Number of atom types. """ maxtype = self.headers.get("atom types", 0) logger.debug(f"Number of atom types: {maxtype}") return maxtype
def newxyz(self, dm: pizza.dump3.dump, ntime: int) ‑> NoneType
-
Replace x, y, z coordinates in the Atoms section with those from a dump object.
Parameters
dm (dump): The dump object containing new coordinates. ntime (int): The timestep to extract coordinates from.
Raises
ValueError
- If required columns are not defined.
Expand source code
def newxyz(self, dm: dump, ntime: int) -> None: """ Replace x, y, z coordinates in the Atoms section with those from a dump object. Parameters: dm (dump): The dump object containing new coordinates. ntime (int): The timestep to extract coordinates from. Raises: ValueError: If required columns are not defined. """ nsnap = dm.findtime(ntime) logger.info(f">> Replacing XYZ for {nsnap} snapshots.") dm.sort(ntime) x, y, z = dm.vecs(ntime, "x", "y", "z") self.replace("Atoms", self.names.get("x", 0) + 1, x) self.replace("Atoms", self.names.get("y", 0) + 1, y) self.replace("Atoms", self.names.get("z", 0) + 1, z) if "ix" in dm.names and "ix" in self.names: ix, iy, iz = dm.vecs(ntime, "ix", "iy", "iz") self.replace("Atoms", self.names.get("ix", 0) + 1, ix) self.replace("Atoms", self.names.get("iy", 0) + 1, iy) self.replace("Atoms", self.names.get("iz", 0) + 1, iz) logger.debug(f"Replaced XYZ coordinates at timestep {ntime}.")
def reorder(self, section: str, *order: int) ‑> NoneType
-
Reorder columns in a data file section.
Parameters
section (str): The name of the section to reorder. *order (int): The new order of column indices.
Raises
ValueError
- If the section name is invalid.
Expand source code
def reorder(self, section: str, *order: int) -> None: """ Reorder columns in a data file section. Parameters: section (str): The name of the section to reorder. *order (int): The new order of column indices. Raises: ValueError: If the section name is invalid. """ if section not in self.sections: raise ValueError(f'"{section}" is not a valid section name.') num_columns = len(order) logger.info(f">> Reordering {num_columns} columns in section '{section}'.") old_lines = self.sections[section] new_lines = [] for line in old_lines: words = line.split() try: reordered = " ".join(words[i - 1] for i in order) + "\n" except IndexError: raise ValueError("Column index out of range during reorder.") new_lines.append(reordered) self.sections[section] = new_lines logger.debug(f"Reordered columns in section '{section}'.")
def replace(self, section: str, column: int, vector: Union[List[float], float]) ‑> NoneType
-
Replace a column in a named section with a vector of values.
Parameters
section (str): The name of the section. column (int): The column index to replace (1-based). vector (Union[List[float], float]): The new values or a single scalar value.
Raises
ValueError
- If the section is invalid or vector length mismatch.
Expand source code
def replace(self, section: str, column: int, vector: Union[List[float], float]) -> None: """ Replace a column in a named section with a vector of values. Parameters: section (str): The name of the section. column (int): The column index to replace (1-based). vector (Union[List[float], float]): The new values or a single scalar value. Raises: ValueError: If the section is invalid or vector length mismatch. """ if section not in self.sections: raise ValueError(f'"{section}" is not a valid section name.') lines = self.sections[section] num_lines = len(lines) if not isinstance(vector, list): vector = [vector] if len(vector) == 1: vector = vector * num_lines if len(vector) != num_lines: raise ValueError(f'The length of new data ({len(vector)}) in section "{section}" does not match the number of rows {num_lines}.') new_lines = [] column_index = column - 1 for i, line in enumerate(lines): words = line.split() if column_index >= len(words): raise ValueError(f"Column index {column} out of range for section '{section}'.") words[column_index] = str(vector[i]) new_line = " ".join(words) + "\n" new_lines.append(new_line) self.sections[section] = new_lines logger.debug(f"Replaced column {column} in section '{section}' with new data.")
def viz(self, isnap: int) ‑> Tuple[int, List[float], List[List[Union[int, float]]], List[List[Union[int, float]]], List[Any], List[Any]]
-
Return visualization data for a specified snapshot.
Parameters
isnap (int): Snapshot index (must be 0 for data object).
Returns
Tuple containing time, box dimensions, atoms, bonds, tris, and lines.
Raises
ValueError
- If isnap is not 0.
Expand source code
def viz(self, isnap: int) -> Tuple[int, List[float], List[List[Union[int, float]]], List[List[Union[int, float]]], List[Any], List[Any]]: """ Return visualization data for a specified snapshot. Parameters: isnap (int): Snapshot index (must be 0 for data object). Returns: Tuple containing time, box dimensions, atoms, bonds, tris, and lines. Raises: ValueError: If isnap is not 0. """ if isnap: raise ValueError("Cannot call data.viz() with isnap != 0.") id_idx = self.names.get("id") type_idx = self.names.get("type") x_idx = self.names.get("x") y_idx = self.names.get("y") z_idx = self.names.get("z") if None in [id_idx, type_idx, x_idx, y_idx, z_idx]: raise ValueError("One or more required columns (id, type, x, y, z) are not defined.") xlohi = self.headers.get("xlo xhi", (0.0, 0.0)) ylohi = self.headers.get("ylo yhi", (0.0, 0.0)) zlohi = self.headers.get("zlo zhi", (0.0, 0.0)) box = [xlohi[0], ylohi[0], zlohi[0], xlohi[1], ylohi[1], zlohi[1]] # Create atom list needed by viz from id, type, x, y, z atoms = [] atom_lines = self.sections.get("Atoms", []) for line in atom_lines: words = line.split() atoms.append([ int(words[id_idx]), int(words[type_idx]), float(words[x_idx]), float(words[y_idx]), float(words[z_idx]), ]) # Create list of current bond coords from list of bonds bonds = [] if "Bonds" in self.sections: bond_lines = self.sections["Bonds"] for line in bond_lines: words = line.split() bid = int(words[0]) btype = int(words[1]) atom1 = int(words[2]) atom2 = int(words[3]) if atom1 - 1 >= len(atom_lines) or atom2 - 1 >= len(atom_lines): raise ValueError("Atom index in Bonds section out of range.") atom1_words = self.sections["Atoms"][atom1 - 1].split() atom2_words = self.sections["Atoms"][atom2 - 1].split() bonds.append([ bid, btype, float(atom1_words[x_idx]), float(atom1_words[y_idx]), float(atom1_words[z_idx]), float(atom2_words[x_idx]), float(atom2_words[y_idx]), float(atom2_words[z_idx]), int(atom1_words[type_idx]), int(atom2_words[type_idx]), ]) tris = [] lines = [] logger.debug("Prepared visualization data.") return 0, box, atoms, bonds, tris, lines
def write(self, filename: str) ‑> NoneType
-
Write the data object to a LAMMPS data file.
Parameters
filename (str): The output file path.
Expand source code
def write(self, filename: str) -> None: """ Write the data object to a LAMMPS data file. Parameters: filename (str): The output file path. """ try: with open(filename, "w") as f: f.write(f"{self.title}\n") logger.debug(f"Wrote title to file '{filename}'.") # Write headers for keyword in self.HKEYWORDS: if keyword in self.headers: value = self.headers[keyword] if keyword in ["xlo xhi", "ylo yhi", "zlo zhi"]: f.write(f"{value[0]} {value[1]} {keyword}\n") elif keyword == "xy xz yz": f.write(f"{value[0]} {value[1]} {value[2]} {keyword}\n") else: f.write(f"{value} {keyword}\n") logger.debug(f"Wrote header '{keyword}' to file.") # Write sections for pair in self.SKEYWORDS: keyword = pair[0] if keyword in self.sections: f.write(f"\n{keyword}\n\n") for line in self.sections[keyword]: f.write(line) logger.debug(f"Wrote section '{keyword}' to file.") logger.info(f"Data object written to '{filename}'.") except IOError as e: logger.error(f"Error writing to file '{filename}': {e}") raise
class dump (*file_list: str, read_files: bool = True)
-
The
dump
class provides comprehensive tools for reading, writing, and manipulating LAMMPS dump files and particle attributes. It handles both static and dynamic properties of snapshots with robust methods for data selection, transformation, and visualization.Initialize a dump object.
Parameters
*file_list (str): Variable length argument list of dump file paths. Can include wildcards. read_files (bool): If False, store filenames without reading. Default is True.
Expand source code
class dump: """ The `dump` class provides comprehensive tools for reading, writing, and manipulating LAMMPS dump files and particle attributes. It handles both static and dynamic properties of snapshots with robust methods for data selection, transformation, and visualization. """ def __init__(self, *file_list: str, read_files: bool = True): """ Initialize a dump object. Parameters: *file_list (str): Variable length argument list of dump file paths. Can include wildcards. read_files (bool): If False, store filenames without reading. Default is True. """ self.snaps: List[Snap] = [] self.nsnaps: int = 0 self.nselect: int = 0 self.names: Dict[str, int] = {} self.tselect = tselect(self) self.aselect = aselect(self) self.atype: str = "type" self.bondflag: int = 0 self.bondlist: List[List[int]] = [] self.triflag: int = 0 self.trilist: List[List[float]] = [] self.lineflag: int = 0 self.linelist: List[List[float]] = [] self.objextra: Optional[Any] = None # flist = list of all dump file names raw_filenames = ' '.join(file_list) self.flist: List[str] = glob.glob(raw_filenames) if read_files else list(file_list) if not self.flist and read_files: logger.error("No dump file specified.") raise ValueError("No dump file specified.") if read_files: self.increment: int = 0 self.read_all() else: self.increment = 1 self.nextfile = 0 self.eof = 0 def __repr__(self) -> str: """ Return a string representation of the dump object. Returns: str: Description of the dump object. """ times = self.time() ntimes = len(times) lastime = times[-1] if ntimes > 0 else 0 fields = self.names field_names = ", ".join(sorted(fields.keys(), key=lambda k: fields[k])) representation = (f'Dump object from file "{self.flist[0]}" ' f'with {ntimes} frames (last timestep={lastime}) ' f'and fields: {field_names}') logger.info(representation) return representation def read_all(self) -> None: """ Read all snapshots from each file in the file list. """ for file in self.flist: is_gzipped = file.endswith(".gz") try: if is_gzipped: with subprocess.Popen([PIZZA_GUNZIP, "-c", file], stdout=subprocess.PIPE, text=True) as proc: file_handle = proc.stdout logger.debug(f"Opened gzipped file: {file}") else: file_handle = open(file, 'r') logger.debug(f"Opened file: {file}") with file_handle: snap = self.read_snapshot(file_handle) while snap: self.snaps.append(snap) logger.info(f"Read snapshot at time {snap.time}") snap = self.read_snapshot(file_handle) except subprocess.CalledProcessError as e: logger.error(f"Error decompressing file '{file}': {e}") raise except FileNotFoundError: logger.error(f"File '{file}' not found.") raise except Exception as e: logger.error(f"Error reading file '{file}': {e}") raise self.snaps.sort() self.cull() self.nsnaps = len(self.snaps) logger.info(f"Read {self.nsnaps} snapshots.") # Select all timesteps and atoms by default self.tselect.all() # Log column assignments if self.names: logger.info(f"Assigned columns: {', '.join(sorted(self.names.keys(), key=lambda k: self.names[k]))}") else: logger.warning("No column assignments made.") # Unscale if necessary if self.nsnaps > 0: if getattr(self, 'scale_original', -1) == 1: self.unscale() elif getattr(self, 'scale_original', -1) == 0: logger.info("Dump is already unscaled.") else: logger.warning("Dump scaling status is unknown.") def read_snapshot(self, f) -> Optional['Snap']: """ Read a single snapshot from a file. Parameters: f (file object): File handle to read from. Returns: Optional[Snap]: Snapshot object or None if failed. """ try: snap = Snap() # Read and assign ITEMS while True: item = f.readline() if not item: break if not item.startswith("ITEM:"): continue item_type = item.split("ITEM:")[1].strip() if item_type == "TIME": snap.realtime = float(f.readline().strip()) elif item_type == "TIMESTEP": snap.time = int(f.readline().strip()) elif item_type == "NUMBER OF ATOMS": snap.natoms = int(f.readline().strip()) elif item_type.startswith("BOX BOUNDS"): snap.boxstr = item_type.split("BOX BOUNDS")[1].strip() box_bounds = [] for _ in range(3): bounds = f.readline().strip().split() box_bounds.append(tuple(map(float, bounds[:2]))) if len(bounds) > 2: setattr(snap, bounds[2], float(bounds[2])) else: setattr(snap, bounds[2] if len(bounds) > 2 else 'xy', 0.0) snap.xlo, snap.xhi = box_bounds[0] snap.ylo, snap.yhi = box_bounds[1] snap.zlo, snap.zhi = box_bounds[2] snap.triclinic = 1 if len(box_bounds[0]) > 2 else 0 elif item_type == "ATOMS": if not self.names: self.assign_column_names(f.readline()) snap.aselect = np.ones(snap.natoms, dtype=bool) atoms = [] for _ in range(snap.natoms): line = f.readline() if not line: break atoms.append(list(map(float, line.strip().split()))) snap.atoms = np.array(atoms) break if not hasattr(snap, 'time'): return None return snap except Exception as e: logger.error(f"Error reading snapshot: {e}") return None def assign_column_names(self, line: str) -> None: """ Assign column names based on the ATOMS section header. Parameters: line (str): The header line containing column names. """ try: columns = line.strip().split()[1:] # Skip the first word (e.g., "id") for idx, col in enumerate(columns): self.names[col] = idx logger.debug(f"Assigned column names: {self.names}") # Determine scaling status based on column names x_scaled = "xs" in self.names y_scaled = "ys" in self.names z_scaled = "zs" in self.names self.scale_original = 1 if x_scaled and y_scaled and z_scaled else 0 logger.info(f"Coordinate scaling status: {'scaled' if self.scale_original else 'unscaled'}") except Exception as e: logger.error(f"Error assigning column names: {e}") raise def __add__(self, other: 'dump') -> 'dump': """ Merge two dump objects of the same type. Parameters: other (dump): Another dump object to merge with. Returns: dump: A new dump object containing snapshots from both dumps. Raises: ValueError: If the dump types do not match or other is not a dump instance. """ if not isinstance(other, dump): raise ValueError("The second operand is not a dump object.") if self.type != other.type: raise ValueError("The dumps are not of the same type.") combined_files = self.flist + other.flist new_dump = dump(*combined_files) return new_dump def cull(self) -> None: """ Remove duplicate snapshots based on timestep. """ unique_snaps = {} culled_snaps = [] for snap in self.snaps: if snap.time not in unique_snaps: unique_snaps[snap.time] = snap culled_snaps.append(snap) else: logger.warning(f"Duplicate timestep {snap.time} found. Culling duplicate.") self.snaps = culled_snaps logger.info(f"Culled duplicates. Total snapshots: {len(self.snaps)}") def sort(self, key: Union[str, int] = "id") -> None: """ Sort atoms or snapshots. Parameters: key (Union[str, int]): The key to sort by. If str, sorts snapshots by that column. If int, sorts atoms in a specific timestep. """ if isinstance(key, str): if key not in self.names: raise ValueError(f"Column '{key}' not found for sorting.") logger.info(f"Sorting snapshots by column '{key}'.") icol = self.names[key] for snap in self.snaps: if not snap.tselect: continue snap.atoms = snap.atoms[snap.atoms[:, icol].argsort()] elif isinstance(key, int): try: snap = self.snaps[self.findtime(key)] logger.info(f"Sorting atoms in snapshot at timestep {key}.") if "id" in self.names: id_col = self.names["id"] snap.atoms = snap.atoms[snap.atoms[:, id_col].argsort()] else: logger.warning("No 'id' column found for sorting atoms.") except ValueError as e: logger.error(e) raise else: logger.error("Invalid key type for sort().") raise TypeError("Key must be a string or integer.") def write(self, filename: str, head: int = 1, app: int = 0) -> None: """ Write the dump object to a LAMMPS dump file. Parameters: filename (str): The output file path. head (int): Whether to include the snapshot header (1 for yes, 0 for no). app (int): Whether to append to the file (1 for yes, 0 for no). """ try: mode = "a" if app else "w" with open(filename, mode) as f: for snap in self.snaps: if not snap.tselect: continue if head: f.write("ITEM: TIMESTEP\n") f.write(f"{snap.time}\n") f.write("ITEM: NUMBER OF ATOMS\n") f.write(f"{snap.nselect}\n") f.write("ITEM: BOX BOUNDS xy xz yz\n" if snap.triclinic else "ITEM: BOX BOUNDS pp pp pp\n") f.write(f"{snap.xlo} {snap.xhi} {getattr(snap, 'xy', 0.0)}\n") f.write(f"{snap.ylo} {snap.yhi} {getattr(snap, 'xz', 0.0)}\n") f.write(f"{snap.zlo} {snap.zhi} {getattr(snap, 'yz', 0.0)}\n") f.write(f"ITEM: ATOMS {' '.join(sorted(self.names.keys(), key=lambda k: self.names[k]))}\n") for atom in snap.atoms[snap.aselect]: atom_str = " ".join([f"{int(atom[self.names['id']])}" if key in ["id", "type"] else f"{atom[self.names[key]]}" for key in sorted(self.names.keys(), key=lambda k: self.names[k])]) f.write(f"{atom_str}\n") logger.info(f"Dump object written to '{filename}'.") except IOError as e: logger.error(f"Error writing to file '{filename}': {e}") raise def scatter(self, root: str) -> None: """ Write each selected snapshot to a separate dump file with timestep suffix. Parameters: root (str): The root name for output files. Suffix will be added based on timestep. """ try: for snap in self.snaps: if not snap.tselect: continue filename = f"{root}.{snap.time}" with open(filename, "w") as f: f.write("ITEM: TIMESTEP\n") f.write(f"{snap.time}\n") f.write("ITEM: NUMBER OF ATOMS\n") f.write(f"{snap.nselect}\n") f.write("ITEM: BOX BOUNDS xy xz yz\n" if snap.triclinic else "ITEM: BOX BOUNDS pp pp pp\n") f.write(f"{snap.xlo} {snap.xhi} {getattr(snap, 'xy', 0.0)}\n") f.write(f"{snap.ylo} {snap.yhi} {getattr(snap, 'xz', 0.0)}\n") f.write(f"{snap.zlo} {snap.zhi} {getattr(snap, 'yz', 0.0)}\n") f.write(f"ITEM: ATOMS {' '.join(sorted(self.names.keys(), key=lambda k: self.names[k]))}\n") for atom in snap.atoms[snap.aselect]: atom_str = " ".join([f"{int(atom[self.names['id']])}" if key in ["id", "type"] else f"{atom[self.names[key]]}" for key in sorted(self.names.keys(), key=lambda k: self.names[k])]) f.write(f"{atom_str}\n") logger.info(f"Scatter write completed with root '{root}'.") except IOError as e: logger.error(f"Error writing scatter files: {e}") raise def minmax(self, colname: str) -> Tuple[float, float]: """ Find the minimum and maximum values for a specified column across all selected snapshots and atoms. Parameters: colname (str): The column name to find min and max for. Returns: Tuple[float, float]: The minimum and maximum values. Raises: KeyError: If the column name does not exist. """ if colname not in self.names: raise KeyError(f"Column '{colname}' not found.") icol = self.names[colname] min_val = np.inf max_val = -np.inf for snap in self.snaps: if not snap.tselect: continue selected_atoms = snap.atoms[snap.aselect] if selected_atoms.size == 0: continue current_min = selected_atoms[:, icol].min() current_max = selected_atoms[:, icol].max() if current_min < min_val: min_val = current_min if current_max > max_val: max_val = current_max logger.info(f"minmax for column '{colname}': min={min_val}, max={max_val}") return min_val, max_val def set(self, eq: str) -> None: """ Set a column value using an equation for all selected snapshots and atoms. Parameters: eq (str): The equation to compute the new column values. Use $<column_name> for variables. Example: d.set("$ke = $vx * $vx + $vy * $vy") """ logger.info(f"Setting column using equation: {eq}") pattern = r"\$\w+" variables = re.findall(pattern, eq) if not variables: logger.warning("No variables found in equation.") return lhs = variables[0][1:] if lhs not in self.names: self.newcolumn(lhs) try: # Replace $var with appropriate array accesses for var in variables: var_name = var[1:] if var_name not in self.names: raise KeyError(f"Variable '{var_name}' not found in columns.") col_index = self.names[var_name] eq = eq.replace(var, f"snap.atoms[i][{col_index}]") compiled_eq = compile(eq, "<string>", "exec") for snap in self.snaps: if not snap.tselect: continue for i in range(snap.natoms): if not snap.aselect[i]: continue exec(compiled_eq) logger.info("Column values set successfully.") except Exception as e: logger.error(f"Error setting column values: {e}") raise def setv(self, colname: str, vector: List[float]) -> None: """ Set a column value using a vector of values for all selected snapshots and atoms. Parameters: colname (str): The column name to set. vector (List[float]): The values to assign to the column. Raises: KeyError: If the column name does not exist. ValueError: If the length of the vector does not match the number of selected atoms. """ logger.info(f"Setting column '{colname}' using a vector of values.") if colname not in self.names: self.newcolumn(colname) icol = self.names[colname] for snap in self.snaps: if not snap.tselect: continue if len(vector) != snap.nselect: raise ValueError("Vector length does not match the number of selected atoms.") selected_indices = np.where(snap.aselect)[0] snap.atoms[selected_indices, icol] = vector logger.info(f"Column '{colname}' set successfully.") def spread(self, old: str, n: int, new: str) -> None: """ Spread values from an old column into a new column as integers from 1 to n based on their relative positions. Parameters: old (str): The column name to spread. n (int): The number of spread values. new (str): The new column name to create. Raises: KeyError: If the old column does not exist. """ logger.info(f"Spreading column '{old}' into new column '{new}' with {n} spread values.") if old not in self.names: raise KeyError(f"Column '{old}' not found.") if new not in self.names: self.newcolumn(new) iold = self.names[old] inew = self.names[new] min_val, max_val = self.minmax(old) gap = max_val - min_val if gap == 0: gap = 1.0 # Prevent division by zero invdelta = n / gap for snap in self.snaps: if not snap.tselect: continue selected_atoms = snap.atoms[snap.aselect] snap.atoms[snap.aselect, inew] = np.clip(((selected_atoms[:, iold] - min_val) * invdelta).astype(int) + 1, 1, n) logger.info(f"Column '{new}' spread successfully.") def clone(self, nstep: int, col: str) -> None: """ Clone the value from a specific timestep's column to all selected snapshots for atoms with the same ID. Parameters: nstep (int): The timestep to clone from. col (str): The column name to clone. Raises: KeyError: If the column or ID column does not exist. ValueError: If the specified timestep does not exist. """ logger.info(f"Cloning column '{col}' from timestep {nstep} to all selected snapshots.") if "id" not in self.names: raise KeyError("Column 'id' not found.") if col not in self.names: raise KeyError(f"Column '{col}' not found.") istep = self.findtime(nstep) icol = self.names[col] id_col = self.names["id"] id_to_index = {atom[id_col]: idx for idx, atom in enumerate(self.snaps[istep].atoms)} for snap in self.snaps: if not snap.tselect: continue for i, atom in enumerate(snap.atoms): if not snap.aselect[i]: continue atom_id = atom[id_col] if atom_id in id_to_index: snap.atoms[i, icol] = self.snaps[istep].atoms[id_to_index[atom_id], icol] logger.info("Cloning completed successfully.") def time(self) -> List[int]: """ Return a list of selected snapshot timesteps. Returns: List[int]: List of timestep values. """ times = [snap.time for snap in self.snaps if snap.tselect] logger.debug(f"Selected timesteps: {times}") return times def realtime(self) -> List[float]: """ Return a list of selected snapshot real-time values. Returns: List[float]: List of real-time values. """ times = [snap.realtime for snap in self.snaps if snap.tselect and hasattr(snap, 'realtime')] logger.debug(f"Selected real-time values: {times}") return times def atom(self, n: int, *columns: str) -> Union[List[float], List[List[float]]]: """ Extract values for a specific atom ID across all selected snapshots. Parameters: n (int): The atom ID to extract. *columns (str): The column names to extract. Returns: Union[List[float], List[List[float]]]: The extracted values. Raises: KeyError: If any specified column does not exist. ValueError: If the atom ID is not found in any snapshot. """ logger.info(f"Extracting atom ID {n} values for columns {columns}.") if not columns: raise ValueError("No columns specified for extraction.") column_indices = [] for col in columns: if col not in self.names: raise KeyError(f"Column '{col}' not found.") column_indices.append(self.names[col]) extracted = [[] for _ in columns] for snap in self.snaps: if not snap.tselect: continue atom_rows = snap.atoms[snap.aselect] id_column = self.names["id"] matching_atoms = atom_rows[atom_rows[:, id_column] == n] if matching_atoms.size == 0: raise ValueError(f"Atom ID {n} not found in snapshot at timestep {snap.time}.") atom = matching_atoms[0] for idx, col_idx in enumerate(column_indices): extracted[idx].append(atom[col_idx]) if len(columns) == 1: return extracted[0] return extracted def vecs(self, n: int, *columns: str) -> Union[List[float], List[List[float]]]: """ Extract values for selected atoms at a specific timestep. Parameters: n (int): The timestep to extract from. *columns (str): The column names to extract. Returns: Union[List[float], List[List[float]]]: The extracted values. Raises: KeyError: If any specified column does not exist. ValueError: If the specified timestep does not exist. """ logger.info(f"Extracting columns {columns} for timestep {n}.") if not columns: raise ValueError("No columns specified for extraction.") try: snap = self.snaps[self.findtime(n)] except ValueError as e: logger.error(e) raise column_indices = [] for col in columns: if col not in self.names: raise KeyError(f"Column '{col}' not found.") column_indices.append(self.names[col]) extracted = [[] for _ in columns] selected_atoms = snap.atoms[snap.aselect] for atom in selected_atoms: for idx, col_idx in enumerate(column_indices): extracted[idx].append(atom[col_idx]) if len(columns) == 1: return extracted[0] return extracted def newcolumn(self, colname: str) -> None: """ Add a new column to every snapshot and initialize it to zero. Parameters: colname (str): The name of the new column. """ logger.info(f"Adding new column '{colname}' with default value 0.") if colname in self.names: logger.warning(f"Column '{colname}' already exists.") return new_col_index = len(self.names) self.names[colname] = new_col_index for snap in self.snaps: if snap.atoms is not None: new_column = np.zeros((snap.atoms.shape[0], 1)) snap.atoms = np.hstack((snap.atoms, new_column)) logger.info(f"New column '{colname}' added successfully.") def kind(self, listtypes: Optional[Dict[str, List[str]]] = None) -> Optional[str]: """ Guess the kind of dump file based on column names. Parameters: listtypes (Optional[Dict[str, List[str]]]): A dictionary defining possible types. Returns: Optional[str]: The kind of dump file if matched, else None. """ if listtypes is None: listtypes = { 'vxyz': ["id", "type", "x", "y", "z", "vx", "vy", "vz"], 'xyz': ["id", "type", "x", "y", "z"] } internaltypes = True else: listtypes = {"user_type": listtypes} internaltypes = False for kind, columns in listtypes.items(): if all(col in self.names for col in columns): logger.info(f"Dump kind identified as '{kind}'.") return kind logger.warning("Dump kind could not be identified.") return None @property def type(self) -> int: """ Get the type of dump file defined as a hash of column names. Returns: int: Hash value representing the dump type. """ type_hash = hash(self.names2str()) logger.debug(f"Dump type hash: {type_hash}") return type_hash def names2str(self) -> str: """ Convert column names to a sorted string based on their indices. Returns: str: A string of column names sorted by their column index. """ sorted_columns = sorted(self.names.items(), key=lambda item: item[1]) names_str = " ".join([col for col, _ in sorted_columns]) logger.debug(f"Column names string: {names_str}") return names_str def __add__(self, other: 'dump') -> 'dump': """ Merge two dump objects of the same type. Parameters: other (dump): Another dump object to merge with. Returns: dump: A new dump object containing snapshots from both dumps. Raises: ValueError: If the dump types do not match or other is not a dump instance. """ return self.__add__(other) def iterator(self, flag: int) -> Tuple[int, int, int]: """ Iterator method to loop over selected snapshots. Parameters: flag (int): 0 for the first call, 1 for subsequent calls. Returns: Tuple[int, int, int]: (index, time, flag) """ if not hasattr(self, 'iterate'): self.iterate = -1 if flag == 0: self.iterate = 0 else: self.iterate += 1 while self.iterate < self.nsnaps: snap = self.snaps[self.iterate] if snap.tselect: logger.debug(f"Iterator returning snapshot {self.iterate} at time {snap.time}.") return self.iterate, snap.time, 1 self.iterate += 1 return 0, 0, -1 def viz(self, index: int, flag: int = 0) -> Tuple[int, List[float], List[List[Union[int, float]]], List[List[Union[int, float]]], List[Any], List[Any]]: """ Return visualization data for a specified snapshot. Parameters: index (int): Snapshot index or timestep value. flag (int): If 1, treat index as timestep value. Default is 0. Returns: Tuple[int, List[float], List[List[Union[int, float]]], List[List[Union[int, float]]], List[Any], List[Any]]: (time, box, atoms, bonds, tris, lines) Raises: ValueError: If the snapshot index is invalid. """ if flag: try: isnap = self.findtime(index) except ValueError as e: logger.error(e) raise else: isnap = index if isnap < 0 or isnap >= self.nsnaps: raise ValueError("Snapshot index out of range.") snap = self.snaps[isnap] time = snap.time box = [snap.xlo, snap.ylo, snap.zlo, snap.xhi, snap.yhi, snap.zhi] id_idx = self.names.get("id") type_idx = self.names.get(self.atype) x_idx = self.names.get("x") y_idx = self.names.get("y") z_idx = self.names.get("z") if None in [id_idx, type_idx, x_idx, y_idx, z_idx]: raise ValueError("One or more required columns (id, type, x, y, z) are not defined.") # Create atom list for visualization atoms = snap.atoms[snap.aselect][:, [id_idx, type_idx, x_idx, y_idx, z_idx]].astype(object).tolist() # Create bonds list if bonds are defined bonds = [] if self.bondflag: if self.bondflag == 1: bondlist = self.bondlist elif self.bondflag == 2 and self.objextra: _, _, _, bondlist, _, _ = self.objextra.viz(time, 1) else: bondlist = [] if bondlist: id_to_atom = {atom[0]: atom for atom in atoms} for bond in bondlist: try: atom1 = id_to_atom[bond[2]] atom2 = id_to_atom[bond[3]] bonds.append([ bond[0], bond[1], atom1[2], atom1[3], atom1[4], atom2[2], atom2[3], atom2[4], atom1[1], atom2[1] ]) except KeyError: logger.warning(f"Bond with atom IDs {bond[2]}, {bond[3]} not found in selected atoms.") continue # Create tris list if tris are defined tris = [] if self.triflag: if self.triflag == 1: tris = self.trilist elif self.triflag == 2 and self.objextra: _, _, _, _, tris, _ = self.objextra.viz(time, 1) # Create lines list if lines are defined lines = [] if self.lineflag: if self.lineflag == 1: lines = self.linelist elif self.lineflag == 2 and self.objextra: _, _, _, _, _, lines = self.objextra.viz(time, 1) logger.debug(f"Visualization data prepared for snapshot {isnap} at time {time}.") return time, box, atoms, bonds, tris, lines def findtime(self, n: int) -> int: """ Find the index of a given timestep. Parameters: n (int): The timestep to find. Returns: int: The index of the timestep. Raises: ValueError: If the timestep does not exist. """ for i, snap in enumerate(self.snaps): if snap.time == n: return i raise ValueError(f"No step {n} exists.") def maxbox(self) -> List[float]: """ Return the maximum box dimensions across all selected snapshots. Returns: List[float]: [xlo, ylo, zlo, xhi, yhi, zhi] """ xlo = ylo = zlo = np.inf xhi = yhi = zhi = -np.inf for snap in self.snaps: if not snap.tselect: continue xlo = min(xlo, snap.xlo) ylo = min(ylo, snap.ylo) zlo = min(zlo, snap.zlo) xhi = max(xhi, snap.xhi) yhi = max(yhi, snap.yhi) zhi = max(zhi, snap.zhi) box = [xlo, ylo, zlo, xhi, yhi, zhi] logger.debug(f"Maximum box dimensions: {box}") return box def maxtype(self) -> int: """ Return the maximum atom type across all selected snapshots and atoms. Returns: int: Maximum atom type. """ if "type" not in self.names: logger.warning("Column 'type' not found.") return 0 icol = self.names["type"] max_type = 0 for snap in self.snaps: if not snap.tselect: continue selected_atoms = snap.atoms[snap.aselect] if selected_atoms.size == 0: continue current_max = int(selected_atoms[:, icol].max()) if current_max > max_type: max_type = current_max logger.info(f"Maximum atom type: {max_type}") return max_type def extra(self, obj: Any) -> None: """ Extract bonds, tris, or lines from another object. Parameters: obj (Any): The object to extract from. Can be a data object, cdata, bdump, etc. Raises: ValueError: If the argument type is unrecognized. """ from pizza.data3 import data from pizza.converted.cdata3 import cdata from pizza.converted.bdump3 import bdump from pizza.converted.ldump3 import ldump from pizza.converted.tdump3 import tdump logger.info(f"Extracting extra information from object of type '{type(obj)}'.") if isinstance(obj, data) and "Bonds" in obj.sections: self.bondflag = 1 self.bondlist = [ [int(line.split()[0]), int(line.split()[1]), int(line.split()[2]), int(line.split()[3])] for line in obj.sections["Bonds"] ] logger.debug(f"Extracted {len(self.bondlist)} bonds from data object.") elif hasattr(obj, 'viz'): if isinstance(obj, cdata): tris, lines = obj.viz() if tris: self.triflag = 1 self.trilist = tris if lines: self.lineflag = 1 self.linelist = lines logger.debug(f"Extracted tris and lines from cdata object.") elif isinstance(obj, bdump): self.bondflag = 2 self.objextra = obj logger.debug(f"Configured dynamic bond extraction from bdump object.") elif isinstance(obj, tdump): self.triflag = 2 self.objextra = obj logger.debug(f"Configured dynamic tri extraction from tdump object.") elif isinstance(obj, ldump): self.lineflag = 2 self.objextra = obj logger.debug(f"Configured dynamic line extraction from ldump object.") else: logger.error("Unrecognized object type for extra extraction.") raise ValueError("Unrecognized argument to dump.extra().") else: logger.error("Unrecognized argument type for extra extraction.") raise ValueError("Unrecognized argument to dump.extra().")
Instance variables
var type : int
-
Get the type of dump file defined as a hash of column names.
Returns
int
- Hash value representing the dump type.
Expand source code
@property def type(self) -> int: """ Get the type of dump file defined as a hash of column names. Returns: int: Hash value representing the dump type. """ type_hash = hash(self.names2str()) logger.debug(f"Dump type hash: {type_hash}") return type_hash
Methods
def assign_column_names(self, line: str) ‑> NoneType
-
Assign column names based on the ATOMS section header.
Parameters
line (str): The header line containing column names.
Expand source code
def assign_column_names(self, line: str) -> None: """ Assign column names based on the ATOMS section header. Parameters: line (str): The header line containing column names. """ try: columns = line.strip().split()[1:] # Skip the first word (e.g., "id") for idx, col in enumerate(columns): self.names[col] = idx logger.debug(f"Assigned column names: {self.names}") # Determine scaling status based on column names x_scaled = "xs" in self.names y_scaled = "ys" in self.names z_scaled = "zs" in self.names self.scale_original = 1 if x_scaled and y_scaled and z_scaled else 0 logger.info(f"Coordinate scaling status: {'scaled' if self.scale_original else 'unscaled'}") except Exception as e: logger.error(f"Error assigning column names: {e}") raise
def atom(self, n: int, *columns: str) ‑> Union[List[float], List[List[float]]]
-
Extract values for a specific atom ID across all selected snapshots.
Parameters
n (int): The atom ID to extract. *columns (str): The column names to extract.
Returns
Union[List[float], List[List[float]]]
- The extracted values.
Raises
KeyError
- If any specified column does not exist.
ValueError
- If the atom ID is not found in any snapshot.
Expand source code
def atom(self, n: int, *columns: str) -> Union[List[float], List[List[float]]]: """ Extract values for a specific atom ID across all selected snapshots. Parameters: n (int): The atom ID to extract. *columns (str): The column names to extract. Returns: Union[List[float], List[List[float]]]: The extracted values. Raises: KeyError: If any specified column does not exist. ValueError: If the atom ID is not found in any snapshot. """ logger.info(f"Extracting atom ID {n} values for columns {columns}.") if not columns: raise ValueError("No columns specified for extraction.") column_indices = [] for col in columns: if col not in self.names: raise KeyError(f"Column '{col}' not found.") column_indices.append(self.names[col]) extracted = [[] for _ in columns] for snap in self.snaps: if not snap.tselect: continue atom_rows = snap.atoms[snap.aselect] id_column = self.names["id"] matching_atoms = atom_rows[atom_rows[:, id_column] == n] if matching_atoms.size == 0: raise ValueError(f"Atom ID {n} not found in snapshot at timestep {snap.time}.") atom = matching_atoms[0] for idx, col_idx in enumerate(column_indices): extracted[idx].append(atom[col_idx]) if len(columns) == 1: return extracted[0] return extracted
def clone(self, nstep: int, col: str) ‑> NoneType
-
Clone the value from a specific timestep's column to all selected snapshots for atoms with the same ID.
Parameters
nstep (int): The timestep to clone from. col (str): The column name to clone.
Raises
KeyError
- If the column or ID column does not exist.
ValueError
- If the specified timestep does not exist.
Expand source code
def clone(self, nstep: int, col: str) -> None: """ Clone the value from a specific timestep's column to all selected snapshots for atoms with the same ID. Parameters: nstep (int): The timestep to clone from. col (str): The column name to clone. Raises: KeyError: If the column or ID column does not exist. ValueError: If the specified timestep does not exist. """ logger.info(f"Cloning column '{col}' from timestep {nstep} to all selected snapshots.") if "id" not in self.names: raise KeyError("Column 'id' not found.") if col not in self.names: raise KeyError(f"Column '{col}' not found.") istep = self.findtime(nstep) icol = self.names[col] id_col = self.names["id"] id_to_index = {atom[id_col]: idx for idx, atom in enumerate(self.snaps[istep].atoms)} for snap in self.snaps: if not snap.tselect: continue for i, atom in enumerate(snap.atoms): if not snap.aselect[i]: continue atom_id = atom[id_col] if atom_id in id_to_index: snap.atoms[i, icol] = self.snaps[istep].atoms[id_to_index[atom_id], icol] logger.info("Cloning completed successfully.")
def cull(self) ‑> NoneType
-
Remove duplicate snapshots based on timestep.
Expand source code
def cull(self) -> None: """ Remove duplicate snapshots based on timestep. """ unique_snaps = {} culled_snaps = [] for snap in self.snaps: if snap.time not in unique_snaps: unique_snaps[snap.time] = snap culled_snaps.append(snap) else: logger.warning(f"Duplicate timestep {snap.time} found. Culling duplicate.") self.snaps = culled_snaps logger.info(f"Culled duplicates. Total snapshots: {len(self.snaps)}")
def extra(self, obj: Any) ‑> NoneType
-
Extract bonds, tris, or lines from another object.
Parameters
obj (Any): The object to extract from. Can be a data object, cdata, bdump, etc.
Raises
ValueError
- If the argument type is unrecognized.
Expand source code
def extra(self, obj: Any) -> None: """ Extract bonds, tris, or lines from another object. Parameters: obj (Any): The object to extract from. Can be a data object, cdata, bdump, etc. Raises: ValueError: If the argument type is unrecognized. """ from pizza.data3 import data from pizza.converted.cdata3 import cdata from pizza.converted.bdump3 import bdump from pizza.converted.ldump3 import ldump from pizza.converted.tdump3 import tdump logger.info(f"Extracting extra information from object of type '{type(obj)}'.") if isinstance(obj, data) and "Bonds" in obj.sections: self.bondflag = 1 self.bondlist = [ [int(line.split()[0]), int(line.split()[1]), int(line.split()[2]), int(line.split()[3])] for line in obj.sections["Bonds"] ] logger.debug(f"Extracted {len(self.bondlist)} bonds from data object.") elif hasattr(obj, 'viz'): if isinstance(obj, cdata): tris, lines = obj.viz() if tris: self.triflag = 1 self.trilist = tris if lines: self.lineflag = 1 self.linelist = lines logger.debug(f"Extracted tris and lines from cdata object.") elif isinstance(obj, bdump): self.bondflag = 2 self.objextra = obj logger.debug(f"Configured dynamic bond extraction from bdump object.") elif isinstance(obj, tdump): self.triflag = 2 self.objextra = obj logger.debug(f"Configured dynamic tri extraction from tdump object.") elif isinstance(obj, ldump): self.lineflag = 2 self.objextra = obj logger.debug(f"Configured dynamic line extraction from ldump object.") else: logger.error("Unrecognized object type for extra extraction.") raise ValueError("Unrecognized argument to dump.extra().") else: logger.error("Unrecognized argument type for extra extraction.") raise ValueError("Unrecognized argument to dump.extra().")
def findtime(self, n: int) ‑> int
-
Find the index of a given timestep.
Parameters
n (int): The timestep to find.
Returns
int
- The index of the timestep.
Raises
ValueError
- If the timestep does not exist.
Expand source code
def findtime(self, n: int) -> int: """ Find the index of a given timestep. Parameters: n (int): The timestep to find. Returns: int: The index of the timestep. Raises: ValueError: If the timestep does not exist. """ for i, snap in enumerate(self.snaps): if snap.time == n: return i raise ValueError(f"No step {n} exists.")
def iterator(self, flag: int) ‑> Tuple[int, int, int]
-
Iterator method to loop over selected snapshots.
Parameters
flag (int): 0 for the first call, 1 for subsequent calls.
Returns
Tuple[int, int, int]
- (index, time, flag)
Expand source code
def iterator(self, flag: int) -> Tuple[int, int, int]: """ Iterator method to loop over selected snapshots. Parameters: flag (int): 0 for the first call, 1 for subsequent calls. Returns: Tuple[int, int, int]: (index, time, flag) """ if not hasattr(self, 'iterate'): self.iterate = -1 if flag == 0: self.iterate = 0 else: self.iterate += 1 while self.iterate < self.nsnaps: snap = self.snaps[self.iterate] if snap.tselect: logger.debug(f"Iterator returning snapshot {self.iterate} at time {snap.time}.") return self.iterate, snap.time, 1 self.iterate += 1 return 0, 0, -1
def kind(self, listtypes: Optional[Dict[str, List[str]]] = None) ‑> Optional[str]
-
Guess the kind of dump file based on column names.
Parameters
listtypes (Optional[Dict[str, List[str]]]): A dictionary defining possible types.
Returns
Optional[str]
- The kind of dump file if matched, else None.
Expand source code
def kind(self, listtypes: Optional[Dict[str, List[str]]] = None) -> Optional[str]: """ Guess the kind of dump file based on column names. Parameters: listtypes (Optional[Dict[str, List[str]]]): A dictionary defining possible types. Returns: Optional[str]: The kind of dump file if matched, else None. """ if listtypes is None: listtypes = { 'vxyz': ["id", "type", "x", "y", "z", "vx", "vy", "vz"], 'xyz': ["id", "type", "x", "y", "z"] } internaltypes = True else: listtypes = {"user_type": listtypes} internaltypes = False for kind, columns in listtypes.items(): if all(col in self.names for col in columns): logger.info(f"Dump kind identified as '{kind}'.") return kind logger.warning("Dump kind could not be identified.") return None
def maxbox(self) ‑> List[float]
-
Return the maximum box dimensions across all selected snapshots.
Returns
List[float]
- [xlo, ylo, zlo, xhi, yhi, zhi]
Expand source code
def maxbox(self) -> List[float]: """ Return the maximum box dimensions across all selected snapshots. Returns: List[float]: [xlo, ylo, zlo, xhi, yhi, zhi] """ xlo = ylo = zlo = np.inf xhi = yhi = zhi = -np.inf for snap in self.snaps: if not snap.tselect: continue xlo = min(xlo, snap.xlo) ylo = min(ylo, snap.ylo) zlo = min(zlo, snap.zlo) xhi = max(xhi, snap.xhi) yhi = max(yhi, snap.yhi) zhi = max(zhi, snap.zhi) box = [xlo, ylo, zlo, xhi, yhi, zhi] logger.debug(f"Maximum box dimensions: {box}") return box
def maxtype(self) ‑> int
-
Return the maximum atom type across all selected snapshots and atoms.
Returns
int
- Maximum atom type.
Expand source code
def maxtype(self) -> int: """ Return the maximum atom type across all selected snapshots and atoms. Returns: int: Maximum atom type. """ if "type" not in self.names: logger.warning("Column 'type' not found.") return 0 icol = self.names["type"] max_type = 0 for snap in self.snaps: if not snap.tselect: continue selected_atoms = snap.atoms[snap.aselect] if selected_atoms.size == 0: continue current_max = int(selected_atoms[:, icol].max()) if current_max > max_type: max_type = current_max logger.info(f"Maximum atom type: {max_type}") return max_type
def minmax(self, colname: str) ‑> Tuple[float, float]
-
Find the minimum and maximum values for a specified column across all selected snapshots and atoms.
Parameters
colname (str): The column name to find min and max for.
Returns
Tuple[float, float]
- The minimum and maximum values.
Raises
KeyError
- If the column name does not exist.
Expand source code
def minmax(self, colname: str) -> Tuple[float, float]: """ Find the minimum and maximum values for a specified column across all selected snapshots and atoms. Parameters: colname (str): The column name to find min and max for. Returns: Tuple[float, float]: The minimum and maximum values. Raises: KeyError: If the column name does not exist. """ if colname not in self.names: raise KeyError(f"Column '{colname}' not found.") icol = self.names[colname] min_val = np.inf max_val = -np.inf for snap in self.snaps: if not snap.tselect: continue selected_atoms = snap.atoms[snap.aselect] if selected_atoms.size == 0: continue current_min = selected_atoms[:, icol].min() current_max = selected_atoms[:, icol].max() if current_min < min_val: min_val = current_min if current_max > max_val: max_val = current_max logger.info(f"minmax for column '{colname}': min={min_val}, max={max_val}") return min_val, max_val
def names2str(self) ‑> str
-
Convert column names to a sorted string based on their indices.
Returns
str
- A string of column names sorted by their column index.
Expand source code
def names2str(self) -> str: """ Convert column names to a sorted string based on their indices. Returns: str: A string of column names sorted by their column index. """ sorted_columns = sorted(self.names.items(), key=lambda item: item[1]) names_str = " ".join([col for col, _ in sorted_columns]) logger.debug(f"Column names string: {names_str}") return names_str
def newcolumn(self, colname: str) ‑> NoneType
-
Add a new column to every snapshot and initialize it to zero.
Parameters
colname (str): The name of the new column.
Expand source code
def newcolumn(self, colname: str) -> None: """ Add a new column to every snapshot and initialize it to zero. Parameters: colname (str): The name of the new column. """ logger.info(f"Adding new column '{colname}' with default value 0.") if colname in self.names: logger.warning(f"Column '{colname}' already exists.") return new_col_index = len(self.names) self.names[colname] = new_col_index for snap in self.snaps: if snap.atoms is not None: new_column = np.zeros((snap.atoms.shape[0], 1)) snap.atoms = np.hstack((snap.atoms, new_column)) logger.info(f"New column '{colname}' added successfully.")
def read_all(self) ‑> NoneType
-
Read all snapshots from each file in the file list.
Expand source code
def read_all(self) -> None: """ Read all snapshots from each file in the file list. """ for file in self.flist: is_gzipped = file.endswith(".gz") try: if is_gzipped: with subprocess.Popen([PIZZA_GUNZIP, "-c", file], stdout=subprocess.PIPE, text=True) as proc: file_handle = proc.stdout logger.debug(f"Opened gzipped file: {file}") else: file_handle = open(file, 'r') logger.debug(f"Opened file: {file}") with file_handle: snap = self.read_snapshot(file_handle) while snap: self.snaps.append(snap) logger.info(f"Read snapshot at time {snap.time}") snap = self.read_snapshot(file_handle) except subprocess.CalledProcessError as e: logger.error(f"Error decompressing file '{file}': {e}") raise except FileNotFoundError: logger.error(f"File '{file}' not found.") raise except Exception as e: logger.error(f"Error reading file '{file}': {e}") raise self.snaps.sort() self.cull() self.nsnaps = len(self.snaps) logger.info(f"Read {self.nsnaps} snapshots.") # Select all timesteps and atoms by default self.tselect.all() # Log column assignments if self.names: logger.info(f"Assigned columns: {', '.join(sorted(self.names.keys(), key=lambda k: self.names[k]))}") else: logger.warning("No column assignments made.") # Unscale if necessary if self.nsnaps > 0: if getattr(self, 'scale_original', -1) == 1: self.unscale() elif getattr(self, 'scale_original', -1) == 0: logger.info("Dump is already unscaled.") else: logger.warning("Dump scaling status is unknown.")
def read_snapshot(self, f) ‑> Optional[pizza.dump3.Snap]
-
Read a single snapshot from a file.
Parameters
f (file object): File handle to read from.
Returns
Optional[Snap]
- Snapshot object or None if failed.
Expand source code
def read_snapshot(self, f) -> Optional['Snap']: """ Read a single snapshot from a file. Parameters: f (file object): File handle to read from. Returns: Optional[Snap]: Snapshot object or None if failed. """ try: snap = Snap() # Read and assign ITEMS while True: item = f.readline() if not item: break if not item.startswith("ITEM:"): continue item_type = item.split("ITEM:")[1].strip() if item_type == "TIME": snap.realtime = float(f.readline().strip()) elif item_type == "TIMESTEP": snap.time = int(f.readline().strip()) elif item_type == "NUMBER OF ATOMS": snap.natoms = int(f.readline().strip()) elif item_type.startswith("BOX BOUNDS"): snap.boxstr = item_type.split("BOX BOUNDS")[1].strip() box_bounds = [] for _ in range(3): bounds = f.readline().strip().split() box_bounds.append(tuple(map(float, bounds[:2]))) if len(bounds) > 2: setattr(snap, bounds[2], float(bounds[2])) else: setattr(snap, bounds[2] if len(bounds) > 2 else 'xy', 0.0) snap.xlo, snap.xhi = box_bounds[0] snap.ylo, snap.yhi = box_bounds[1] snap.zlo, snap.zhi = box_bounds[2] snap.triclinic = 1 if len(box_bounds[0]) > 2 else 0 elif item_type == "ATOMS": if not self.names: self.assign_column_names(f.readline()) snap.aselect = np.ones(snap.natoms, dtype=bool) atoms = [] for _ in range(snap.natoms): line = f.readline() if not line: break atoms.append(list(map(float, line.strip().split()))) snap.atoms = np.array(atoms) break if not hasattr(snap, 'time'): return None return snap except Exception as e: logger.error(f"Error reading snapshot: {e}") return None
def realtime(self) ‑> List[float]
-
Return a list of selected snapshot real-time values.
Returns
List[float]
- List of real-time values.
Expand source code
def realtime(self) -> List[float]: """ Return a list of selected snapshot real-time values. Returns: List[float]: List of real-time values. """ times = [snap.realtime for snap in self.snaps if snap.tselect and hasattr(snap, 'realtime')] logger.debug(f"Selected real-time values: {times}") return times
def scatter(self, root: str) ‑> NoneType
-
Write each selected snapshot to a separate dump file with timestep suffix.
Parameters
root (str): The root name for output files. Suffix will be added based on timestep.
Expand source code
def scatter(self, root: str) -> None: """ Write each selected snapshot to a separate dump file with timestep suffix. Parameters: root (str): The root name for output files. Suffix will be added based on timestep. """ try: for snap in self.snaps: if not snap.tselect: continue filename = f"{root}.{snap.time}" with open(filename, "w") as f: f.write("ITEM: TIMESTEP\n") f.write(f"{snap.time}\n") f.write("ITEM: NUMBER OF ATOMS\n") f.write(f"{snap.nselect}\n") f.write("ITEM: BOX BOUNDS xy xz yz\n" if snap.triclinic else "ITEM: BOX BOUNDS pp pp pp\n") f.write(f"{snap.xlo} {snap.xhi} {getattr(snap, 'xy', 0.0)}\n") f.write(f"{snap.ylo} {snap.yhi} {getattr(snap, 'xz', 0.0)}\n") f.write(f"{snap.zlo} {snap.zhi} {getattr(snap, 'yz', 0.0)}\n") f.write(f"ITEM: ATOMS {' '.join(sorted(self.names.keys(), key=lambda k: self.names[k]))}\n") for atom in snap.atoms[snap.aselect]: atom_str = " ".join([f"{int(atom[self.names['id']])}" if key in ["id", "type"] else f"{atom[self.names[key]]}" for key in sorted(self.names.keys(), key=lambda k: self.names[k])]) f.write(f"{atom_str}\n") logger.info(f"Scatter write completed with root '{root}'.") except IOError as e: logger.error(f"Error writing scatter files: {e}") raise
def set(self, eq: str) ‑> NoneType
-
Set a column value using an equation for all selected snapshots and atoms.
Parameters
eq (str): The equation to compute the new column values. Use $
for variables. Example
d.set("$ke = $vx * $vx + $vy * $vy")
Expand source code
def set(self, eq: str) -> None: """ Set a column value using an equation for all selected snapshots and atoms. Parameters: eq (str): The equation to compute the new column values. Use $<column_name> for variables. Example: d.set("$ke = $vx * $vx + $vy * $vy") """ logger.info(f"Setting column using equation: {eq}") pattern = r"\$\w+" variables = re.findall(pattern, eq) if not variables: logger.warning("No variables found in equation.") return lhs = variables[0][1:] if lhs not in self.names: self.newcolumn(lhs) try: # Replace $var with appropriate array accesses for var in variables: var_name = var[1:] if var_name not in self.names: raise KeyError(f"Variable '{var_name}' not found in columns.") col_index = self.names[var_name] eq = eq.replace(var, f"snap.atoms[i][{col_index}]") compiled_eq = compile(eq, "<string>", "exec") for snap in self.snaps: if not snap.tselect: continue for i in range(snap.natoms): if not snap.aselect[i]: continue exec(compiled_eq) logger.info("Column values set successfully.") except Exception as e: logger.error(f"Error setting column values: {e}") raise
def setv(self, colname: str, vector: List[float]) ‑> NoneType
-
Set a column value using a vector of values for all selected snapshots and atoms.
Parameters
colname (str): The column name to set. vector (List[float]): The values to assign to the column.
Raises
KeyError
- If the column name does not exist.
ValueError
- If the length of the vector does not match the number of selected atoms.
Expand source code
def setv(self, colname: str, vector: List[float]) -> None: """ Set a column value using a vector of values for all selected snapshots and atoms. Parameters: colname (str): The column name to set. vector (List[float]): The values to assign to the column. Raises: KeyError: If the column name does not exist. ValueError: If the length of the vector does not match the number of selected atoms. """ logger.info(f"Setting column '{colname}' using a vector of values.") if colname not in self.names: self.newcolumn(colname) icol = self.names[colname] for snap in self.snaps: if not snap.tselect: continue if len(vector) != snap.nselect: raise ValueError("Vector length does not match the number of selected atoms.") selected_indices = np.where(snap.aselect)[0] snap.atoms[selected_indices, icol] = vector logger.info(f"Column '{colname}' set successfully.")
def sort(self, key: Union[str, int] = 'id') ‑> NoneType
-
Sort atoms or snapshots.
Parameters
key (Union[str, int]): The key to sort by. If str, sorts snapshots by that column. If int, sorts atoms in a specific timestep.
Expand source code
def sort(self, key: Union[str, int] = "id") -> None: """ Sort atoms or snapshots. Parameters: key (Union[str, int]): The key to sort by. If str, sorts snapshots by that column. If int, sorts atoms in a specific timestep. """ if isinstance(key, str): if key not in self.names: raise ValueError(f"Column '{key}' not found for sorting.") logger.info(f"Sorting snapshots by column '{key}'.") icol = self.names[key] for snap in self.snaps: if not snap.tselect: continue snap.atoms = snap.atoms[snap.atoms[:, icol].argsort()] elif isinstance(key, int): try: snap = self.snaps[self.findtime(key)] logger.info(f"Sorting atoms in snapshot at timestep {key}.") if "id" in self.names: id_col = self.names["id"] snap.atoms = snap.atoms[snap.atoms[:, id_col].argsort()] else: logger.warning("No 'id' column found for sorting atoms.") except ValueError as e: logger.error(e) raise else: logger.error("Invalid key type for sort().") raise TypeError("Key must be a string or integer.")
def spread(self, old: str, n: int, new: str) ‑> NoneType
-
Spread values from an old column into a new column as integers from 1 to n based on their relative positions.
Parameters
old (str): The column name to spread. n (int): The number of spread values. new (str): The new column name to create.
Raises
KeyError
- If the old column does not exist.
Expand source code
def spread(self, old: str, n: int, new: str) -> None: """ Spread values from an old column into a new column as integers from 1 to n based on their relative positions. Parameters: old (str): The column name to spread. n (int): The number of spread values. new (str): The new column name to create. Raises: KeyError: If the old column does not exist. """ logger.info(f"Spreading column '{old}' into new column '{new}' with {n} spread values.") if old not in self.names: raise KeyError(f"Column '{old}' not found.") if new not in self.names: self.newcolumn(new) iold = self.names[old] inew = self.names[new] min_val, max_val = self.minmax(old) gap = max_val - min_val if gap == 0: gap = 1.0 # Prevent division by zero invdelta = n / gap for snap in self.snaps: if not snap.tselect: continue selected_atoms = snap.atoms[snap.aselect] snap.atoms[snap.aselect, inew] = np.clip(((selected_atoms[:, iold] - min_val) * invdelta).astype(int) + 1, 1, n) logger.info(f"Column '{new}' spread successfully.")
def time(self) ‑> List[int]
-
Return a list of selected snapshot timesteps.
Returns
List[int]
- List of timestep values.
Expand source code
def time(self) -> List[int]: """ Return a list of selected snapshot timesteps. Returns: List[int]: List of timestep values. """ times = [snap.time for snap in self.snaps if snap.tselect] logger.debug(f"Selected timesteps: {times}") return times
def vecs(self, n: int, *columns: str) ‑> Union[List[float], List[List[float]]]
-
Extract values for selected atoms at a specific timestep.
Parameters
n (int): The timestep to extract from. *columns (str): The column names to extract.
Returns
Union[List[float], List[List[float]]]
- The extracted values.
Raises
KeyError
- If any specified column does not exist.
ValueError
- If the specified timestep does not exist.
Expand source code
def vecs(self, n: int, *columns: str) -> Union[List[float], List[List[float]]]: """ Extract values for selected atoms at a specific timestep. Parameters: n (int): The timestep to extract from. *columns (str): The column names to extract. Returns: Union[List[float], List[List[float]]]: The extracted values. Raises: KeyError: If any specified column does not exist. ValueError: If the specified timestep does not exist. """ logger.info(f"Extracting columns {columns} for timestep {n}.") if not columns: raise ValueError("No columns specified for extraction.") try: snap = self.snaps[self.findtime(n)] except ValueError as e: logger.error(e) raise column_indices = [] for col in columns: if col not in self.names: raise KeyError(f"Column '{col}' not found.") column_indices.append(self.names[col]) extracted = [[] for _ in columns] selected_atoms = snap.atoms[snap.aselect] for atom in selected_atoms: for idx, col_idx in enumerate(column_indices): extracted[idx].append(atom[col_idx]) if len(columns) == 1: return extracted[0] return extracted
def viz(self, index: int, flag: int = 0) ‑> Tuple[int, List[float], List[List[Union[int, float]]], List[List[Union[int, float]]], List[Any], List[Any]]
-
Return visualization data for a specified snapshot.
Parameters
index (int): Snapshot index or timestep value. flag (int): If 1, treat index as timestep value. Default is 0.
Returns
Tuple[int, List[float], List[List[Union[int, float]]], List[List[Union[int, float]]], List[Any], List[Any]]: (time, box, atoms, bonds, tris, lines)
Raises
ValueError
- If the snapshot index is invalid.
Expand source code
def viz(self, index: int, flag: int = 0) -> Tuple[int, List[float], List[List[Union[int, float]]], List[List[Union[int, float]]], List[Any], List[Any]]: """ Return visualization data for a specified snapshot. Parameters: index (int): Snapshot index or timestep value. flag (int): If 1, treat index as timestep value. Default is 0. Returns: Tuple[int, List[float], List[List[Union[int, float]]], List[List[Union[int, float]]], List[Any], List[Any]]: (time, box, atoms, bonds, tris, lines) Raises: ValueError: If the snapshot index is invalid. """ if flag: try: isnap = self.findtime(index) except ValueError as e: logger.error(e) raise else: isnap = index if isnap < 0 or isnap >= self.nsnaps: raise ValueError("Snapshot index out of range.") snap = self.snaps[isnap] time = snap.time box = [snap.xlo, snap.ylo, snap.zlo, snap.xhi, snap.yhi, snap.zhi] id_idx = self.names.get("id") type_idx = self.names.get(self.atype) x_idx = self.names.get("x") y_idx = self.names.get("y") z_idx = self.names.get("z") if None in [id_idx, type_idx, x_idx, y_idx, z_idx]: raise ValueError("One or more required columns (id, type, x, y, z) are not defined.") # Create atom list for visualization atoms = snap.atoms[snap.aselect][:, [id_idx, type_idx, x_idx, y_idx, z_idx]].astype(object).tolist() # Create bonds list if bonds are defined bonds = [] if self.bondflag: if self.bondflag == 1: bondlist = self.bondlist elif self.bondflag == 2 and self.objextra: _, _, _, bondlist, _, _ = self.objextra.viz(time, 1) else: bondlist = [] if bondlist: id_to_atom = {atom[0]: atom for atom in atoms} for bond in bondlist: try: atom1 = id_to_atom[bond[2]] atom2 = id_to_atom[bond[3]] bonds.append([ bond[0], bond[1], atom1[2], atom1[3], atom1[4], atom2[2], atom2[3], atom2[4], atom1[1], atom2[1] ]) except KeyError: logger.warning(f"Bond with atom IDs {bond[2]}, {bond[3]} not found in selected atoms.") continue # Create tris list if tris are defined tris = [] if self.triflag: if self.triflag == 1: tris = self.trilist elif self.triflag == 2 and self.objextra: _, _, _, _, tris, _ = self.objextra.viz(time, 1) # Create lines list if lines are defined lines = [] if self.lineflag: if self.lineflag == 1: lines = self.linelist elif self.lineflag == 2 and self.objextra: _, _, _, _, _, lines = self.objextra.viz(time, 1) logger.debug(f"Visualization data prepared for snapshot {isnap} at time {time}.") return time, box, atoms, bonds, tris, lines
def write(self, filename: str, head: int = 1, app: int = 0) ‑> NoneType
-
Write the dump object to a LAMMPS dump file.
Parameters
filename (str): The output file path. head (int): Whether to include the snapshot header (1 for yes, 0 for no). app (int): Whether to append to the file (1 for yes, 0 for no).
Expand source code
def write(self, filename: str, head: int = 1, app: int = 0) -> None: """ Write the dump object to a LAMMPS dump file. Parameters: filename (str): The output file path. head (int): Whether to include the snapshot header (1 for yes, 0 for no). app (int): Whether to append to the file (1 for yes, 0 for no). """ try: mode = "a" if app else "w" with open(filename, mode) as f: for snap in self.snaps: if not snap.tselect: continue if head: f.write("ITEM: TIMESTEP\n") f.write(f"{snap.time}\n") f.write("ITEM: NUMBER OF ATOMS\n") f.write(f"{snap.nselect}\n") f.write("ITEM: BOX BOUNDS xy xz yz\n" if snap.triclinic else "ITEM: BOX BOUNDS pp pp pp\n") f.write(f"{snap.xlo} {snap.xhi} {getattr(snap, 'xy', 0.0)}\n") f.write(f"{snap.ylo} {snap.yhi} {getattr(snap, 'xz', 0.0)}\n") f.write(f"{snap.zlo} {snap.zhi} {getattr(snap, 'yz', 0.0)}\n") f.write(f"ITEM: ATOMS {' '.join(sorted(self.names.keys(), key=lambda k: self.names[k]))}\n") for atom in snap.atoms[snap.aselect]: atom_str = " ".join([f"{int(atom[self.names['id']])}" if key in ["id", "type"] else f"{atom[self.names[key]]}" for key in sorted(self.names.keys(), key=lambda k: self.names[k])]) f.write(f"{atom_str}\n") logger.info(f"Dump object written to '{filename}'.") except IOError as e: logger.error(f"Error writing to file '{filename}': {e}") raise