Module data3
data
Class
The data
class provides tools to read, write, and manipulate LAMMPS data files, enabling seamless integration with the dump
class for restart generation and simulation data management.
Features
- Input Handling:
- Supports single or multiple data files, including gzipped files.
-
Create empty data objects or initialize from an existing
dump
object. -
Headers and Sections:
- Access and modify headers, including atom counts and box dimensions.
-
Define, reorder, append, and replace columns in data file sections.
-
Integration with
dump
: - Generate restart files from
dump
snapshots. -
Replace atomic positions and velocities in
Atoms
andVelocities
sections. -
Visualization:
- Extract atoms and bonds for visualization tools.
- Iterate over single data file snapshots (compatible with
dump
).
Usage
Initialization
-
From a File:
python d = data("data.poly") # Read a LAMMPS data file
-
Create an Empty Object:
python d = data() # Create an empty data object
-
From a
dump
Object:python d = data(dump_obj, timestep) # Generate data object from dump snapshot
Accessing Data
-
Headers:
python d.headers["atoms"] = 1500 # Set atom count in header
-
Sections:
python d.sections["Atoms"] = lines # Define the <code>Atoms</code> section
Manipulation
-
Column Mapping:
python d.map(1, "id", 3, "x") # Assign names to columns
-
Reorder Columns:
python d.reorder("Atoms", 1, 3, 2, 4) # Reorder columns in a section
-
Replace or Append Data:
python d.replace("Atoms", 5, vec) # Replace a column in <code>Atoms</code> d.append("Atoms", vec) # Append a new column to <code>Atoms</code>
-
Delete Headers or Sections:
python d.delete("Bonds") # Remove the <code>Bonds</code> section
Output
- Write to a File:
python d.write("data.new") # Write the data object to a file
Visualization
- Extract Data for Visualization:
python time, box, atoms, bonds, tris, lines = d.viz(0)
Integration with dump
- Replace Atomic Positions:
python d.newxyz(dump_obj, timestep) # Replace atomic positions with <code><a title="data3.dump" href="#data3.dump">dump</a></code> data
Examples
Basic Usage
d = data("data.poly") # Load a LAMMPS data file
d.headers["atoms"] = 2000 # Update atom count
d.reorder("Atoms", 1, 3, 2, 4) # Reorder columns in `Atoms`
d.write("data.new") # Save to a new file
Restart Generation
dump_obj = dump("dump.poly")
d = data(dump_obj, 1000) # Create data object from dump
d.write("data.restart") # Write restart file
Visualization
time, box, atoms, bonds, tris, lines = d.viz(0)
Properties
- Headers:
atoms
: Number of atoms in the data file.atom types
: Number of atom types.-
xlo xhi
,ylo yhi
,zlo zhi
: Box dimensions. -
Sections:
Atoms
: Atomic data (e.g., ID, type, coordinates).Velocities
: Atomic velocities (optional).- Additional sections for bonds, angles, etc.
Notes
- Compatibility: Fully compatible with
dump
for restart and visualization tasks. - Error Handling: Automatically validates headers and sections for consistency.
- Extensibility: Easily add or modify headers, sections, and attributes.
Expand source code
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
# `data` Class
The `data` class provides tools to read, write, and manipulate LAMMPS data files, enabling seamless integration with the `dump` class for restart generation and simulation data management.
---
## Features
- **Input Handling**:
- Supports single or multiple data files, including gzipped files.
- Create empty data objects or initialize from an existing `dump` object.
- **Headers and Sections**:
- Access and modify headers, including atom counts and box dimensions.
- Define, reorder, append, and replace columns in data file sections.
- **Integration with `dump`**:
- Generate restart files from `dump` snapshots.
- Replace atomic positions and velocities in `Atoms` and `Velocities` sections.
- **Visualization**:
- Extract atoms and bonds for visualization tools.
- Iterate over single data file snapshots (compatible with `dump`).
---
## Usage
### Initialization
- **From a File**:
```python
d = data("data.poly") # Read a LAMMPS data file
```
- **Create an Empty Object**:
```python
d = data() # Create an empty data object
```
- **From a `dump` Object**:
```python
d = data(dump_obj, timestep) # Generate data object from dump snapshot
```
### Accessing Data
- **Headers**:
```python
d.headers["atoms"] = 1500 # Set atom count in header
```
- **Sections**:
```python
d.sections["Atoms"] = lines # Define the `Atoms` section
```
### Manipulation
- **Column Mapping**:
```python
d.map(1, "id", 3, "x") # Assign names to columns
```
- **Reorder Columns**:
```python
d.reorder("Atoms", 1, 3, 2, 4) # Reorder columns in a section
```
- **Replace or Append Data**:
```python
d.replace("Atoms", 5, vec) # Replace a column in `Atoms`
d.append("Atoms", vec) # Append a new column to `Atoms`
```
- **Delete Headers or Sections**:
```python
d.delete("Bonds") # Remove the `Bonds` section
```
### Output
- **Write to a File**:
```python
d.write("data.new") # Write the data object to a file
```
### Visualization
- **Extract Data for Visualization**:
```python
time, box, atoms, bonds, tris, lines = d.viz(0)
```
### Integration with `dump`
- **Replace Atomic Positions**:
```python
d.newxyz(dump_obj, timestep) # Replace atomic positions with `dump` data
```
---
## Examples
### Basic Usage
```python
d = data("data.poly") # Load a LAMMPS data file
d.headers["atoms"] = 2000 # Update atom count
d.reorder("Atoms", 1, 3, 2, 4) # Reorder columns in `Atoms`
d.write("data.new") # Save to a new file
```
### Restart Generation
```python
dump_obj = dump("dump.poly")
d = data(dump_obj, 1000) # Create data object from dump
d.write("data.restart") # Write restart file
```
### Visualization
```python
time, box, atoms, bonds, tris, lines = d.viz(0)
```
---
## Properties
- **Headers**:
- `atoms`: Number of atoms in the data file.
- `atom types`: Number of atom types.
- `xlo xhi`, `ylo yhi`, `zlo zhi`: Box dimensions.
- **Sections**:
- `Atoms`: Atomic data (e.g., ID, type, coordinates).
- `Velocities`: Atomic velocities (optional).
- Additional sections for bonds, angles, etc.
---
## Notes
- **Compatibility**: Fully compatible with `dump` for restart and visualization tasks.
- **Error Handling**: Automatically validates headers and sections for consistency.
- **Extensibility**: Easily add or modify headers, sections, and attributes.
---
"""
__project__ = "Pizza3"
__author__ = "Olivier Vitrac"
__copyright__ = "Copyright 2022"
__credits__ = ["Steve Plimpton", "Olivier Vitrac"]
__license__ = "GPLv3"
__maintainer__ = "Olivier Vitrac"
__email__ = "olivier.vitrac@agroparistech.fr"
__version__ = "0.99991"
# Pizza.py toolkit, www.cs.sandia.gov/~sjplimp/pizza.html
# Steve Plimpton, sjplimp@sandia.gov, Sandia National Laboratories
#
# Copyright (2005) Sandia Corporation. Under the terms of Contract
# DE-AC04-94AL85000 with Sandia Corporation, the U.S. Government retains
# certain rights in this software. This software is distributed under
# the GNU General Public License.
# data tool
# Code converted to pyton 3.x
# INRAE\olivier.vitrac@agroparistech.fr
#
# last release
# 2022-02-03 - add flist, __repr__
# 2022-02-04 - add append and start to add comments
# 2022-02-10 - first implementation of a full restart object from a dump object
# 2022-02-12 - revised append method, more robust, more verbose
# 2024-12-08 - updated help
oneline = "Read, write, manipulate LAMMPS data files"
docstr = """
d = data("data.poly") read a LAMMPS data file, can be gzipped
d = data() create an empty data file
d.map(1,"id",3,"x") assign names to atom columns (1-N)
coeffs = d.get("Pair Coeffs") extract info from data file section
q = d.get("Atoms",4)
1 arg = all columns returned as 2d array of floats
2 args = Nth column returned as vector of floats
d.reorder("Atoms",1,3,2,4,5) reorder columns (1-N) in a data file section
1,3,2,4,5 = new order of previous columns, can delete columns this way
d.title = "My LAMMPS data file" set title of the data file
d.headers["atoms"] = 1500 set a header value
d.sections["Bonds"] = lines set a section to list of lines (with newlines)
d.delete("bonds") delete a keyword or section of data file
d.delete("Bonds")
d.replace("Atoms",5,vec) replace Nth column of section with vector
d.newxyz(dmp,1000) replace xyz in Atoms with xyz of snapshot N
newxyz assumes id,x,y,z are defined in both data and dump files
also replaces ix,iy,iz if they are defined
index,time,flag = d.iterator(0/1) loop over single data file snapshot
time,box,atoms,bonds,tris,lines = d.viz(index) return list of viz objects
iterator() and viz() are compatible with equivalent dump calls
iterator() called with arg = 0 first time, with arg = 1 on subsequent calls
index = timestep index within dump object (only 0 for data file)
time = timestep value (only 0 for data file)
flag = -1 when iteration is done, 1 otherwise
viz() returns info for specified timestep index (must be 0)
time = 0
box = [xlo,ylo,zlo,xhi,yhi,zhi]
atoms = id,type,x,y,z for each atom as 2d array
bonds = id,type,x1,y1,z1,x2,y2,z2,t1,t2 for each bond as 2d array
NULL if bonds do not exist
tris = NULL
lines = NULL
d.write("data.new") write a LAMMPS data file
"""
# History
# 8/05, Steve Plimpton (SNL): original version
# 11/07, added triclinic box support
# ToDo list
# Variables
# title = 1st line of data file
# names = dictionary with atom attributes as keys, col #s as values
# headers = dictionary with header name as key, value or tuple as values
# sections = dictionary with section name as key, array of lines as values
# nselect = 1 = # of snapshots
# Imports and external programs
# External dependency
from os import popen
import numpy as np
# Dependecy for the creation of DATA restart object from a DUMP object
from pizza.dump3 import dump
__all__ = ['data', 'dump']
#try:
# tmp = PIZZA_GUNZIP
#except:
PIZZA_GUNZIP = "gunzip"
# Class definition
class data:
# --------------------------------------------------------------------
def __init__(self, *list):
self.nselect = 1
if len(list) == 0:
# ========================================
# Default Constructor (empty object)
# ========================================
self.title = "LAMMPS data file"
self.names = {}
self.headers = {}
self.sections = {}
self.flist = []
self.restart = False
return
elif isinstance(list[0],dump):
# ========================================
# Constructor from an existing DUMP object
# ========================================
X = list[0] # supplied dump object
t = X.time()
nt = len(t)
if len(list)>1:
tselect = list[1]
if tselect not in t:
raise ValueError("the input time is not available in the dump object")
else:
tselect = t[-1]
itselect = next(i for i in range(nt) if t[i]==tselect)
# set object title
self.title = 'LAMMPS data file (restart from "%s" t = %0.5g (frame %d of %d))' % \
(X.flist[0],tselect,itselect,nt)
# set default names ------------- HEADERS SECTION -------------
self.names = {}
X.tselect.one(tselect)
snap = X.snaps[itselect]
# set headers
self.headers = { 'atoms': snap.natoms,
'atom types': X.minmax("type")[1],
'xlo xhi': (snap.xlo, snap.xhi),
'ylo yhi': (snap.ylo, snap.yhi),
'zlo zhi': (snap.zlo, snap.zhi)}
# Default sections
self.sections = {}
# set atoms (specific to your style/kind) ------------- ATOMS SUBSECTION -------------
template_atoms = {"smd": ["id","type","mol","c_vol", "mass", "radius",
"c_contact_radius", "x", "y", "z", "f_1[1]", "f_1[2]", "f_1[3]"] }
if X.kind(template_atoms["smd"]):
for col in template_atoms["smd"]:
self.append("Atoms", X.vecs(tselect, col), col in ["id","type","mol"],col)
else:
raise ValueError("Please add your ATOMS section in the constructor")
# set velocities (if required) ------------- VELOCITIES SUBSECTION -------------
template_velocities = {"smd": ["id","vx","vy","vz"]}
if X.kind(template_atoms["smd"]):
if X.kind(template_velocities["smd"]):
for col in template_velocities["smd"]:
self.append("Velocities", X.vecs(tselect, col), col == "id",col)
else:
raise ValueError("the velocities are missing for the style SMD")
# store filename
self.flist = list[0].flist
self.restart = True
return
# ===========================================================
# Regular constructor from DATA file (supplied as filename)
# ===========================================================
flist = list
file = list[0]
if file[-3:] == ".gz":
f = popen("%s -c %s" % (PIZZA_GUNZIP, file), "r")
else:
f = open(file)
self.title = f.readline()
self.names = {}
headers = {}
while 1:
line = f.readline()
line = line.strip()
if len(line) == 0:
continue
found = 0
for keyword in hkeywords:
if line.find(keyword) >= 0:
found = 1
words = line.split()
if (
keyword == "xlo xhi"
or keyword == "ylo yhi"
or keyword == "zlo zhi"
):
headers[keyword] = (float(words[0]), float(words[1]))
elif keyword == "xy xz yz":
headers[keyword] = (
float(words[0]),
float(words[1]),
float(words[2]),
)
else:
headers[keyword] = int(words[0])
if not found:
break
sections = {}
while 1:
found = 0
for pair in skeywords:
keyword, length = pair[0], pair[1]
if keyword == line:
found = 1
if length not in headers: #if not headers.has_key(length):
raise ValueError("data section %s has no matching header value" % line)
f.readline()
list = []
for i in range(headers[length]):
list.append(f.readline())
sections[keyword] = list
if not found:
raise ValueError("invalid section %s in data file" % line)
f.readline()
line = f.readline()
if not line:
break
line = line.strip()
f.close()
self.headers = headers
self.sections = sections
self.flist = flist
self.restart = False
# --------------------------------------------------------------------
# Display method (added OV - 2022-02-03)
def __repr__(self):
if self.sections == {} or self.headers == {}:
ret = "empty %s" % self.title
print(ret)
return ret
if self.restart:
kind = "restart"
else:
kind = "source"
print("Data file: %s\n\tcontains %d atoms from %d atom types\n\twith box = [%0.4g %0.4g %0.4g %0.4g %0.4g %0.4g]"
% (self.flist[0],
self.headers['atoms'],
self.headers['atom types'],
self.headers['xlo xhi'][0],
self.headers['xlo xhi'][1],
self.headers['ylo yhi'][0],
self.headers['ylo yhi'][1],
self.headers['zlo zhi'][0],
self.headers['zlo zhi'][1],
) )
print("\twith the following sections:")
for sectionname in self.sections.keys():
print("\t\t" +self.dispsection(sectionname,False))
ret = 'LAMMPS data object including %d atoms (%d types, %s="%s")' \
% (self.headers['atoms'],self.maxtype(),kind,self.flist[0])
return ret
# --------------------------------------------------------------------
# assign names to atom columns
def map(self, *pairs):
if len(pairs) % 2 != 0:
raise ValueError ("data map() requires pairs of mappings")
for i in range(0, len(pairs), 2):
j = i + 1
self.names[pairs[j]] = pairs[i] - 1
# --------------------------------------------------------------------
# extract info from data file fields
def get(self, *list):
if len(list) == 1:
field = list[0]
array = []
lines = self.sections[field]
for line in lines:
words = line.split()
values = map(float, words)
array.append(values)
return array
elif len(list) == 2:
field = list[0]
n = list[1] - 1
vec = []
lines = self.sections[field]
for line in lines:
words = line.split()
vec.append(float(words[n]))
return vec
else:
raise ValueError("invalid arguments for data.get()")
# --------------------------------------------------------------------
# reorder columns in a data file field
def reorder(self, name, *order):
""" reorder columns: reorder("section",colidxfirst,colidxsecond,colidxthird,...) """
if name not in self.sections:
raise ValueError('"%s" is not a valid section name' % name)
n = len(order)
print(">> reorder for %d columns" % n)
natoms = len(self.sections[name])
oldlines = self.sections[name]
newlines = natoms * [""]
for index in order:
for i in range(len(newlines)):
words = oldlines[i].split()
newlines[i] += words[index - 1] + " "
for i in range(len(newlines)):
newlines[i] += "\n"
self.sections[name] = newlines
# --------------------------------------------------------------------
# replace a column of named section with vector of values
# the number of data is checked and repeated for scalars (added 2022-02-04)
def replace(self, name, icol, vector):
""" replace column values: replace("section",columnindex,vectorofvalues) with columnindex=1..ncolumns """
lines = self.sections[name]
nlines = len(lines)
if name not in self.sections:
raise ValueError('"%s" is not a valid section name' % name)
if not isinstance(vector,list): vector = [vector]
if len(vector)==1: vector = vector * nlines
if len(vector) != nlines:
raise ValueError('the length of new data (%d) in section "%s" does not match the number of rows %d' % \
(len(vector),name,nlines))
newlines = []
j = icol - 1
for i in range(nlines):
line = lines[i]
words = line.split()
words[j] = str(vector[i])
newline = " ".join(words) + "\n"
newlines.append(newline)
self.sections[name] = newlines
# --------------------------------------------------------------------
# append a column of named section with vector of values (added 2022-02-04)
def append(self, name, vector, forceinteger = False, propertyname=None):
""" append a new column: X.append("section",vectorofvalues,forceinteger=False,propertyname=None) """
if name not in self.sections:
self.sections[name] = []
print('Add section [%s] - file="%s"' % (name,self.title) )
lines = self.sections[name]
nlines = len(lines)
if not isinstance(vector,list) and not isinstance(vector,np.ndarray):
vector = [vector]
if propertyname != None:
print('\t> Add "%s" (%d values) to [%s]' % (propertyname,len(vector),name))
else:
print('\t> Add %d values (no name) to [%s]' % (len(vector),name))
newlines = []
if nlines == 0: # empty atoms section, create first column
nlines = len(vector) # new column length = input column length
for i in range(nlines):
if forceinteger:
line = str(int(vector[i]))
else:
line = str(vector[i])
newlines.append(line)
else:
if len(vector)==1: vector = vector * nlines
if len(vector) != nlines:
raise ValueError('the length of new data (%d) in section "%s" does not match the number of rows %d' % \
(len(vector),name,nlines))
for i in range(nlines):
line = lines[i]
words = line.split()
if forceinteger:
words.append(str(int(vector[i])))
else:
words.append(str(vector[i]))
newline = " ".join(words) + "\n"
newlines.append(newline)
self.sections[name] = newlines
# --------------------------------------------------------------------
# disp section info (added 2022-02-04)
def dispsection(self, name,flaghead=True):
""" display section info: X.dispsection("sectionname") """
lines = self.sections[name]
nlines = len(lines)
line = lines[0]
words = line.split()
ret = '"%s": %d x %d values' % (name,nlines,len(words))
if flaghead: ret = "LAMMPS data section "+ret
return ret
# --------------------------------------------------------------------
# replace x,y,z in Atoms with x,y,z values from snapshot ntime of dump object
# assumes id,x,y,z are defined in both data and dump files
# also replaces ix,iy,iz if they are defined
def newxyz(self, dm, ntime):
nsnap = dm.findtime(ntime)
print(">> newxyz for %d snaps" % nsnap)
dm.sort(ntime)
x, y, z = dm.vecs(ntime, "x", "y", "z")
self.replace("Atoms", self.names["x"] + 1, x)
self.replace("Atoms", self.names["y"] + 1, y)
self.replace("Atoms", self.names["z"] + 1, z)
if "ix" in dm.names and "ix" in self.names: #if dm.names.has_key("ix") and self.names.has_key("ix"):
ix, iy, iz = dm.vecs(ntime, "ix", "iy", "iz")
self.replace("Atoms", self.names["ix"] + 1, ix)
self.replace("Atoms", self.names["iy"] + 1, iy)
self.replace("Atoms", self.names["iz"] + 1, iz)
# --------------------------------------------------------------------
# delete header value or section from data file
def delete(self, keyword):
if keyword in self.headers: # if self.headers.has_key(keyword):
del self.headers[keyword]
elif keyword in self.sections: # elif self.sections.has_key(keyword):
del self.sections[keyword]
else:
raise ValueError("keyword not found in data object")
# --------------------------------------------------------------------
# write out a LAMMPS data file
def write(self, file):
f = open(file, "w")
print(self.title,file=f)
for keyword in hkeywords:
if keyword in self.headers: # self.headers.has_key(keyword):
if keyword == "xlo xhi" or keyword == "ylo yhi" or keyword == "zlo zhi":
pair = self.headers[keyword]
print(pair[0], pair[1], keyword,file=f)
elif keyword == "xy xz yz":
triple = self.headers[keyword]
print(triple[0], triple[1], triple[2], keyword,file=f)
else:
print(self.headers[keyword], keyword,file=f)
for pair in skeywords:
keyword = pair[0]
if keyword in self.sections: #self.sections.has_key(keyword):
print("\n%s\n" % keyword,file=f)
for line in self.sections[keyword]:
print(line,file=f,end="")
f.close()
# --------------------------------------------------------------------
# iterator called from other tools
def iterator(self, flag):
if flag == 0:
return 0, 0, 1
return 0, 0, -1
# --------------------------------------------------------------------
# time query from other tools
def findtime(self, n):
if n == 0:
return 0
raise ValueError("no step %d exists" % n)
# --------------------------------------------------------------------
# return list of atoms and bonds to viz for data object
def viz(self, isnap):
if isnap:
raise ValueError("cannot call data.viz() with isnap != 0")
id = self.names["id"]
type = self.names["type"]
x = self.names["x"]
y = self.names["y"]
z = self.names["z"]
xlohi = self.headers["xlo xhi"]
ylohi = self.headers["ylo yhi"]
zlohi = self.headers["zlo zhi"]
box = [xlohi[0], ylohi[0], zlohi[0], xlohi[1], ylohi[1], zlohi[1]]
# create atom list needed by viz from id,type,x,y,z
atoms = []
atomlines = self.sections["Atoms"]
for line in atomlines:
words = line.split()
atoms.append(
[
int(words[id]),
int(words[type]),
float(words[x]),
float(words[y]),
float(words[z]),
]
)
# create list of current bond coords from list of bonds
# assumes atoms are sorted so can lookup up the 2 atoms in each bond
bonds = []
if self.sections.has_key("Bonds"):
bondlines = self.sections["Bonds"]
for line in bondlines:
words = line.split()
bid, btype = int(words[0]), int(words[1])
atom1, atom2 = int(words[2]), int(words[3])
atom1words = atomlines[atom1 - 1].split()
atom2words = atomlines[atom2 - 1].split()
bonds.append(
[
bid,
btype,
float(atom1words[x]),
float(atom1words[y]),
float(atom1words[z]),
float(atom2words[x]),
float(atom2words[y]),
float(atom2words[z]),
float(atom1words[type]),
float(atom2words[type]),
]
)
tris = []
lines = []
return 0, box, atoms, bonds, tris, lines
# --------------------------------------------------------------------
# return box size
def maxbox(self):
xlohi = self.headers["xlo xhi"]
ylohi = self.headers["ylo yhi"]
zlohi = self.headers["zlo zhi"]
return [xlohi[0], ylohi[0], zlohi[0], xlohi[1], ylohi[1], zlohi[1]]
# --------------------------------------------------------------------
# return number of atom types
def maxtype(self):
return self.headers["atom types"]
# --------------------------------------------------------------------
# data file keywords, both header and main sections
hkeywords = [
"atoms",
"ellipsoids",
"lines",
"triangles",
"bodies",
"bonds",
"angles",
"dihedrals",
"impropers",
"atom types",
"bond types",
"angle types",
"dihedral types",
"improper types",
"xlo xhi",
"ylo yhi",
"zlo zhi",
"xy xz yz",
]
skeywords = [
["Masses", "atom types"],
["Atoms", "atoms"],
["Ellipsoids", "ellipsoids"],
["Lines", "lines"],
["Triangles", "triangles"],
["Bodies", "bodies"],
["Bonds", "bonds"],
["Angles", "angles"],
["Dihedrals", "dihedrals"],
["Impropers", "impropers"],
["Velocities", "atoms"],
["Pair Coeffs", "atom types"],
["Bond Coeffs", "bond types"],
["Angle Coeffs", "angle types"],
["Dihedral Coeffs", "dihedral types"],
["Improper Coeffs", "improper types"],
["BondBond Coeffs", "angle types"],
["BondAngle Coeffs", "angle types"],
["MiddleBondTorsion Coeffs", "dihedral types"],
["EndBondTorsion Coeffs", "dihedral types"],
["AngleTorsion Coeffs", "dihedral types"],
["AngleAngleTorsion Coeffs", "dihedral types"],
["BondBond13 Coeffs", "dihedral types"],
["AngleAngle Coeffs", "improper types"],
["Molecules", "atoms"],
["Tinker Types", "atoms"],
]
# %%
# ===================================================
# main()
# ===================================================
# for debugging purposes (code called as a script)
# the code is called from here
# ===================================================
if __name__ == '__main__':
datafile = "../data/play_data/data.play.lmp"
X = data(datafile)
Y = dump("../data/play_data/dump.play.restartme")
t = Y.time()
step = 2000
R = data(Y,step)
R.write("../tmp/data.myfirstrestart.lmp")
Classes
class data (*list)
-
Expand source code
class data: # -------------------------------------------------------------------- def __init__(self, *list): self.nselect = 1 if len(list) == 0: # ======================================== # Default Constructor (empty object) # ======================================== self.title = "LAMMPS data file" self.names = {} self.headers = {} self.sections = {} self.flist = [] self.restart = False return elif isinstance(list[0],dump): # ======================================== # Constructor from an existing DUMP object # ======================================== X = list[0] # supplied dump object t = X.time() nt = len(t) if len(list)>1: tselect = list[1] if tselect not in t: raise ValueError("the input time is not available in the dump object") else: tselect = t[-1] itselect = next(i for i in range(nt) if t[i]==tselect) # set object title self.title = 'LAMMPS data file (restart from "%s" t = %0.5g (frame %d of %d))' % \ (X.flist[0],tselect,itselect,nt) # set default names ------------- HEADERS SECTION ------------- self.names = {} X.tselect.one(tselect) snap = X.snaps[itselect] # set headers self.headers = { 'atoms': snap.natoms, 'atom types': X.minmax("type")[1], 'xlo xhi': (snap.xlo, snap.xhi), 'ylo yhi': (snap.ylo, snap.yhi), 'zlo zhi': (snap.zlo, snap.zhi)} # Default sections self.sections = {} # set atoms (specific to your style/kind) ------------- ATOMS SUBSECTION ------------- template_atoms = {"smd": ["id","type","mol","c_vol", "mass", "radius", "c_contact_radius", "x", "y", "z", "f_1[1]", "f_1[2]", "f_1[3]"] } if X.kind(template_atoms["smd"]): for col in template_atoms["smd"]: self.append("Atoms", X.vecs(tselect, col), col in ["id","type","mol"],col) else: raise ValueError("Please add your ATOMS section in the constructor") # set velocities (if required) ------------- VELOCITIES SUBSECTION ------------- template_velocities = {"smd": ["id","vx","vy","vz"]} if X.kind(template_atoms["smd"]): if X.kind(template_velocities["smd"]): for col in template_velocities["smd"]: self.append("Velocities", X.vecs(tselect, col), col == "id",col) else: raise ValueError("the velocities are missing for the style SMD") # store filename self.flist = list[0].flist self.restart = True return # =========================================================== # Regular constructor from DATA file (supplied as filename) # =========================================================== flist = list file = list[0] if file[-3:] == ".gz": f = popen("%s -c %s" % (PIZZA_GUNZIP, file), "r") else: f = open(file) self.title = f.readline() self.names = {} headers = {} while 1: line = f.readline() line = line.strip() if len(line) == 0: continue found = 0 for keyword in hkeywords: if line.find(keyword) >= 0: found = 1 words = line.split() if ( keyword == "xlo xhi" or keyword == "ylo yhi" or keyword == "zlo zhi" ): headers[keyword] = (float(words[0]), float(words[1])) elif keyword == "xy xz yz": headers[keyword] = ( float(words[0]), float(words[1]), float(words[2]), ) else: headers[keyword] = int(words[0]) if not found: break sections = {} while 1: found = 0 for pair in skeywords: keyword, length = pair[0], pair[1] if keyword == line: found = 1 if length not in headers: #if not headers.has_key(length): raise ValueError("data section %s has no matching header value" % line) f.readline() list = [] for i in range(headers[length]): list.append(f.readline()) sections[keyword] = list if not found: raise ValueError("invalid section %s in data file" % line) f.readline() line = f.readline() if not line: break line = line.strip() f.close() self.headers = headers self.sections = sections self.flist = flist self.restart = False # -------------------------------------------------------------------- # Display method (added OV - 2022-02-03) def __repr__(self): if self.sections == {} or self.headers == {}: ret = "empty %s" % self.title print(ret) return ret if self.restart: kind = "restart" else: kind = "source" print("Data file: %s\n\tcontains %d atoms from %d atom types\n\twith box = [%0.4g %0.4g %0.4g %0.4g %0.4g %0.4g]" % (self.flist[0], self.headers['atoms'], self.headers['atom types'], self.headers['xlo xhi'][0], self.headers['xlo xhi'][1], self.headers['ylo yhi'][0], self.headers['ylo yhi'][1], self.headers['zlo zhi'][0], self.headers['zlo zhi'][1], ) ) print("\twith the following sections:") for sectionname in self.sections.keys(): print("\t\t" +self.dispsection(sectionname,False)) ret = 'LAMMPS data object including %d atoms (%d types, %s="%s")' \ % (self.headers['atoms'],self.maxtype(),kind,self.flist[0]) return ret # -------------------------------------------------------------------- # assign names to atom columns def map(self, *pairs): if len(pairs) % 2 != 0: raise ValueError ("data map() requires pairs of mappings") for i in range(0, len(pairs), 2): j = i + 1 self.names[pairs[j]] = pairs[i] - 1 # -------------------------------------------------------------------- # extract info from data file fields def get(self, *list): if len(list) == 1: field = list[0] array = [] lines = self.sections[field] for line in lines: words = line.split() values = map(float, words) array.append(values) return array elif len(list) == 2: field = list[0] n = list[1] - 1 vec = [] lines = self.sections[field] for line in lines: words = line.split() vec.append(float(words[n])) return vec else: raise ValueError("invalid arguments for data.get()") # -------------------------------------------------------------------- # reorder columns in a data file field def reorder(self, name, *order): """ reorder columns: reorder("section",colidxfirst,colidxsecond,colidxthird,...) """ if name not in self.sections: raise ValueError('"%s" is not a valid section name' % name) n = len(order) print(">> reorder for %d columns" % n) natoms = len(self.sections[name]) oldlines = self.sections[name] newlines = natoms * [""] for index in order: for i in range(len(newlines)): words = oldlines[i].split() newlines[i] += words[index - 1] + " " for i in range(len(newlines)): newlines[i] += "\n" self.sections[name] = newlines # -------------------------------------------------------------------- # replace a column of named section with vector of values # the number of data is checked and repeated for scalars (added 2022-02-04) def replace(self, name, icol, vector): """ replace column values: replace("section",columnindex,vectorofvalues) with columnindex=1..ncolumns """ lines = self.sections[name] nlines = len(lines) if name not in self.sections: raise ValueError('"%s" is not a valid section name' % name) if not isinstance(vector,list): vector = [vector] if len(vector)==1: vector = vector * nlines if len(vector) != nlines: raise ValueError('the length of new data (%d) in section "%s" does not match the number of rows %d' % \ (len(vector),name,nlines)) newlines = [] j = icol - 1 for i in range(nlines): line = lines[i] words = line.split() words[j] = str(vector[i]) newline = " ".join(words) + "\n" newlines.append(newline) self.sections[name] = newlines # -------------------------------------------------------------------- # append a column of named section with vector of values (added 2022-02-04) def append(self, name, vector, forceinteger = False, propertyname=None): """ append a new column: X.append("section",vectorofvalues,forceinteger=False,propertyname=None) """ if name not in self.sections: self.sections[name] = [] print('Add section [%s] - file="%s"' % (name,self.title) ) lines = self.sections[name] nlines = len(lines) if not isinstance(vector,list) and not isinstance(vector,np.ndarray): vector = [vector] if propertyname != None: print('\t> Add "%s" (%d values) to [%s]' % (propertyname,len(vector),name)) else: print('\t> Add %d values (no name) to [%s]' % (len(vector),name)) newlines = [] if nlines == 0: # empty atoms section, create first column nlines = len(vector) # new column length = input column length for i in range(nlines): if forceinteger: line = str(int(vector[i])) else: line = str(vector[i]) newlines.append(line) else: if len(vector)==1: vector = vector * nlines if len(vector) != nlines: raise ValueError('the length of new data (%d) in section "%s" does not match the number of rows %d' % \ (len(vector),name,nlines)) for i in range(nlines): line = lines[i] words = line.split() if forceinteger: words.append(str(int(vector[i]))) else: words.append(str(vector[i])) newline = " ".join(words) + "\n" newlines.append(newline) self.sections[name] = newlines # -------------------------------------------------------------------- # disp section info (added 2022-02-04) def dispsection(self, name,flaghead=True): """ display section info: X.dispsection("sectionname") """ lines = self.sections[name] nlines = len(lines) line = lines[0] words = line.split() ret = '"%s": %d x %d values' % (name,nlines,len(words)) if flaghead: ret = "LAMMPS data section "+ret return ret # -------------------------------------------------------------------- # replace x,y,z in Atoms with x,y,z values from snapshot ntime of dump object # assumes id,x,y,z are defined in both data and dump files # also replaces ix,iy,iz if they are defined def newxyz(self, dm, ntime): nsnap = dm.findtime(ntime) print(">> newxyz for %d snaps" % nsnap) dm.sort(ntime) x, y, z = dm.vecs(ntime, "x", "y", "z") self.replace("Atoms", self.names["x"] + 1, x) self.replace("Atoms", self.names["y"] + 1, y) self.replace("Atoms", self.names["z"] + 1, z) if "ix" in dm.names and "ix" in self.names: #if dm.names.has_key("ix") and self.names.has_key("ix"): ix, iy, iz = dm.vecs(ntime, "ix", "iy", "iz") self.replace("Atoms", self.names["ix"] + 1, ix) self.replace("Atoms", self.names["iy"] + 1, iy) self.replace("Atoms", self.names["iz"] + 1, iz) # -------------------------------------------------------------------- # delete header value or section from data file def delete(self, keyword): if keyword in self.headers: # if self.headers.has_key(keyword): del self.headers[keyword] elif keyword in self.sections: # elif self.sections.has_key(keyword): del self.sections[keyword] else: raise ValueError("keyword not found in data object") # -------------------------------------------------------------------- # write out a LAMMPS data file def write(self, file): f = open(file, "w") print(self.title,file=f) for keyword in hkeywords: if keyword in self.headers: # self.headers.has_key(keyword): if keyword == "xlo xhi" or keyword == "ylo yhi" or keyword == "zlo zhi": pair = self.headers[keyword] print(pair[0], pair[1], keyword,file=f) elif keyword == "xy xz yz": triple = self.headers[keyword] print(triple[0], triple[1], triple[2], keyword,file=f) else: print(self.headers[keyword], keyword,file=f) for pair in skeywords: keyword = pair[0] if keyword in self.sections: #self.sections.has_key(keyword): print("\n%s\n" % keyword,file=f) for line in self.sections[keyword]: print(line,file=f,end="") f.close() # -------------------------------------------------------------------- # iterator called from other tools def iterator(self, flag): if flag == 0: return 0, 0, 1 return 0, 0, -1 # -------------------------------------------------------------------- # time query from other tools def findtime(self, n): if n == 0: return 0 raise ValueError("no step %d exists" % n) # -------------------------------------------------------------------- # return list of atoms and bonds to viz for data object def viz(self, isnap): if isnap: raise ValueError("cannot call data.viz() with isnap != 0") id = self.names["id"] type = self.names["type"] x = self.names["x"] y = self.names["y"] z = self.names["z"] xlohi = self.headers["xlo xhi"] ylohi = self.headers["ylo yhi"] zlohi = self.headers["zlo zhi"] box = [xlohi[0], ylohi[0], zlohi[0], xlohi[1], ylohi[1], zlohi[1]] # create atom list needed by viz from id,type,x,y,z atoms = [] atomlines = self.sections["Atoms"] for line in atomlines: words = line.split() atoms.append( [ int(words[id]), int(words[type]), float(words[x]), float(words[y]), float(words[z]), ] ) # create list of current bond coords from list of bonds # assumes atoms are sorted so can lookup up the 2 atoms in each bond bonds = [] if self.sections.has_key("Bonds"): bondlines = self.sections["Bonds"] for line in bondlines: words = line.split() bid, btype = int(words[0]), int(words[1]) atom1, atom2 = int(words[2]), int(words[3]) atom1words = atomlines[atom1 - 1].split() atom2words = atomlines[atom2 - 1].split() bonds.append( [ bid, btype, float(atom1words[x]), float(atom1words[y]), float(atom1words[z]), float(atom2words[x]), float(atom2words[y]), float(atom2words[z]), float(atom1words[type]), float(atom2words[type]), ] ) tris = [] lines = [] return 0, box, atoms, bonds, tris, lines # -------------------------------------------------------------------- # return box size def maxbox(self): xlohi = self.headers["xlo xhi"] ylohi = self.headers["ylo yhi"] zlohi = self.headers["zlo zhi"] return [xlohi[0], ylohi[0], zlohi[0], xlohi[1], ylohi[1], zlohi[1]] # -------------------------------------------------------------------- # return number of atom types def maxtype(self): return self.headers["atom types"]
Methods
def append(self, name, vector, forceinteger=False, propertyname=None)
-
append a new column: X.append("section",vectorofvalues,forceinteger=False,propertyname=None)
Expand source code
def append(self, name, vector, forceinteger = False, propertyname=None): """ append a new column: X.append("section",vectorofvalues,forceinteger=False,propertyname=None) """ if name not in self.sections: self.sections[name] = [] print('Add section [%s] - file="%s"' % (name,self.title) ) lines = self.sections[name] nlines = len(lines) if not isinstance(vector,list) and not isinstance(vector,np.ndarray): vector = [vector] if propertyname != None: print('\t> Add "%s" (%d values) to [%s]' % (propertyname,len(vector),name)) else: print('\t> Add %d values (no name) to [%s]' % (len(vector),name)) newlines = [] if nlines == 0: # empty atoms section, create first column nlines = len(vector) # new column length = input column length for i in range(nlines): if forceinteger: line = str(int(vector[i])) else: line = str(vector[i]) newlines.append(line) else: if len(vector)==1: vector = vector * nlines if len(vector) != nlines: raise ValueError('the length of new data (%d) in section "%s" does not match the number of rows %d' % \ (len(vector),name,nlines)) for i in range(nlines): line = lines[i] words = line.split() if forceinteger: words.append(str(int(vector[i]))) else: words.append(str(vector[i])) newline = " ".join(words) + "\n" newlines.append(newline) self.sections[name] = newlines
def delete(self, keyword)
-
Expand source code
def delete(self, keyword): if keyword in self.headers: # if self.headers.has_key(keyword): del self.headers[keyword] elif keyword in self.sections: # elif self.sections.has_key(keyword): del self.sections[keyword] else: raise ValueError("keyword not found in data object")
def dispsection(self, name, flaghead=True)
-
display section info: X.dispsection("sectionname")
Expand source code
def dispsection(self, name,flaghead=True): """ display section info: X.dispsection("sectionname") """ lines = self.sections[name] nlines = len(lines) line = lines[0] words = line.split() ret = '"%s": %d x %d values' % (name,nlines,len(words)) if flaghead: ret = "LAMMPS data section "+ret return ret
def findtime(self, n)
-
Expand source code
def findtime(self, n): if n == 0: return 0 raise ValueError("no step %d exists" % n)
def get(self, *list)
-
Expand source code
def get(self, *list): if len(list) == 1: field = list[0] array = [] lines = self.sections[field] for line in lines: words = line.split() values = map(float, words) array.append(values) return array elif len(list) == 2: field = list[0] n = list[1] - 1 vec = [] lines = self.sections[field] for line in lines: words = line.split() vec.append(float(words[n])) return vec else: raise ValueError("invalid arguments for data.get()")
def iterator(self, flag)
-
Expand source code
def iterator(self, flag): if flag == 0: return 0, 0, 1 return 0, 0, -1
def map(self, *pairs)
-
Expand source code
def map(self, *pairs): if len(pairs) % 2 != 0: raise ValueError ("data map() requires pairs of mappings") for i in range(0, len(pairs), 2): j = i + 1 self.names[pairs[j]] = pairs[i] - 1
def maxbox(self)
-
Expand source code
def maxbox(self): xlohi = self.headers["xlo xhi"] ylohi = self.headers["ylo yhi"] zlohi = self.headers["zlo zhi"] return [xlohi[0], ylohi[0], zlohi[0], xlohi[1], ylohi[1], zlohi[1]]
def maxtype(self)
-
Expand source code
def maxtype(self): return self.headers["atom types"]
def newxyz(self, dm, ntime)
-
Expand source code
def newxyz(self, dm, ntime): nsnap = dm.findtime(ntime) print(">> newxyz for %d snaps" % nsnap) dm.sort(ntime) x, y, z = dm.vecs(ntime, "x", "y", "z") self.replace("Atoms", self.names["x"] + 1, x) self.replace("Atoms", self.names["y"] + 1, y) self.replace("Atoms", self.names["z"] + 1, z) if "ix" in dm.names and "ix" in self.names: #if dm.names.has_key("ix") and self.names.has_key("ix"): ix, iy, iz = dm.vecs(ntime, "ix", "iy", "iz") self.replace("Atoms", self.names["ix"] + 1, ix) self.replace("Atoms", self.names["iy"] + 1, iy) self.replace("Atoms", self.names["iz"] + 1, iz)
def reorder(self, name, *order)
-
reorder columns: reorder("section",colidxfirst,colidxsecond,colidxthird,…)
Expand source code
def reorder(self, name, *order): """ reorder columns: reorder("section",colidxfirst,colidxsecond,colidxthird,...) """ if name not in self.sections: raise ValueError('"%s" is not a valid section name' % name) n = len(order) print(">> reorder for %d columns" % n) natoms = len(self.sections[name]) oldlines = self.sections[name] newlines = natoms * [""] for index in order: for i in range(len(newlines)): words = oldlines[i].split() newlines[i] += words[index - 1] + " " for i in range(len(newlines)): newlines[i] += "\n" self.sections[name] = newlines
def replace(self, name, icol, vector)
-
replace column values: replace("section",columnindex,vectorofvalues) with columnindex=1..ncolumns
Expand source code
def replace(self, name, icol, vector): """ replace column values: replace("section",columnindex,vectorofvalues) with columnindex=1..ncolumns """ lines = self.sections[name] nlines = len(lines) if name not in self.sections: raise ValueError('"%s" is not a valid section name' % name) if not isinstance(vector,list): vector = [vector] if len(vector)==1: vector = vector * nlines if len(vector) != nlines: raise ValueError('the length of new data (%d) in section "%s" does not match the number of rows %d' % \ (len(vector),name,nlines)) newlines = [] j = icol - 1 for i in range(nlines): line = lines[i] words = line.split() words[j] = str(vector[i]) newline = " ".join(words) + "\n" newlines.append(newline) self.sections[name] = newlines
def viz(self, isnap)
-
Expand source code
def viz(self, isnap): if isnap: raise ValueError("cannot call data.viz() with isnap != 0") id = self.names["id"] type = self.names["type"] x = self.names["x"] y = self.names["y"] z = self.names["z"] xlohi = self.headers["xlo xhi"] ylohi = self.headers["ylo yhi"] zlohi = self.headers["zlo zhi"] box = [xlohi[0], ylohi[0], zlohi[0], xlohi[1], ylohi[1], zlohi[1]] # create atom list needed by viz from id,type,x,y,z atoms = [] atomlines = self.sections["Atoms"] for line in atomlines: words = line.split() atoms.append( [ int(words[id]), int(words[type]), float(words[x]), float(words[y]), float(words[z]), ] ) # create list of current bond coords from list of bonds # assumes atoms are sorted so can lookup up the 2 atoms in each bond bonds = [] if self.sections.has_key("Bonds"): bondlines = self.sections["Bonds"] for line in bondlines: words = line.split() bid, btype = int(words[0]), int(words[1]) atom1, atom2 = int(words[2]), int(words[3]) atom1words = atomlines[atom1 - 1].split() atom2words = atomlines[atom2 - 1].split() bonds.append( [ bid, btype, float(atom1words[x]), float(atom1words[y]), float(atom1words[z]), float(atom2words[x]), float(atom2words[y]), float(atom2words[z]), float(atom1words[type]), float(atom2words[type]), ] ) tris = [] lines = [] return 0, box, atoms, bonds, tris, lines
def write(self, file)
-
Expand source code
def write(self, file): f = open(file, "w") print(self.title,file=f) for keyword in hkeywords: if keyword in self.headers: # self.headers.has_key(keyword): if keyword == "xlo xhi" or keyword == "ylo yhi" or keyword == "zlo zhi": pair = self.headers[keyword] print(pair[0], pair[1], keyword,file=f) elif keyword == "xy xz yz": triple = self.headers[keyword] print(triple[0], triple[1], triple[2], keyword,file=f) else: print(self.headers[keyword], keyword,file=f) for pair in skeywords: keyword = pair[0] if keyword in self.sections: #self.sections.has_key(keyword): print("\n%s\n" % keyword,file=f) for line in self.sections[keyword]: print(line,file=f,end="") f.close()
class dump (*list)
-
Expand source code
class dump: # -------------------------------------------------------------------- def __init__(self, *list): self.snaps = [] self.nsnaps = self.nselect = 0 self.names = {} self.tselect = tselect(self) self.aselect = aselect(self) self.atype = "type" self.bondflag = 0 self.bondlist = [] self.triflag = 0 self.trilist = [] self.lineflag = 0 self.linelist = [] self.objextra = None # flist = list of all dump file names words = list[0].split() self.flist = [] for word in words: self.flist += glob.glob(word) if len(self.flist) == 0 and len(list) == 1: raise ValueError("no dump file specified") if len(list) == 1: self.increment = 0 self.read_all() else: self.increment = 1 self.nextfile = 0 self.eof = 0 # -------------------------------------------------------------------- def __repr__(self): times = self.time(); ntimes = len(times) lastime = times[-1]; fields = self.names; print("Dump file: %s\ncontains %d frames (tend=%0.4g)\nwith fields" % \ (self.flist[0],ntimes,lastime) ) for k in sorted(fields,key=fields.get,reverse=False): print("\t%02d: %s" % (fields[k],k) ) ret = 'LAMMPS dump object with %d properties and %d frames (tend=%0.4g, - source="%s"' % \ (len(fields),ntimes,lastime,self.flist[0]) return ret # -------------------------------------------------------------------- def read_all(self): # read all snapshots from each file # test for gzipped files for file in self.flist: if file[-3:] == ".gz": f = popen("%s -c %s" % (PIZZA_GUNZIP, file), "r") else: f = open(file) snap = self.read_snapshot(f) while snap: self.snaps.append(snap) print(snap.time, file=sys.stdout, flush=True, end=" ") snap = self.read_snapshot(f) f.close() print # sort entries by timestep, cull duplicates self.snaps.sort() # self.snaps.sort(self.compare_time) #%% to be fixed in the future (OV) self.cull() self.nsnaps = len(self.snaps) print("read %d snapshots" % (self.nsnaps)) # select all timesteps and atoms self.tselect.all() # print column assignments if len(self.names): print("assigned columns:", ",".join(list(self.names.keys()))) else: print("no column assignments made") # if snapshots are scaled, unscale them if ( (not "x" in self.names) or (not "y" in self.names) or (not "z" in self.names) ): print("dump scaling status is unknown") elif self.nsnaps > 0: if self.scale_original == 1: self.unscale() elif self.scale_original == 0: print("dump is already unscaled") else: print("dump scaling status is unknown") # -------------------------------------------------------------------- # read next snapshot from list of files def next(self): if not self.increment: raise ValueError("cannot read incrementally") # read next snapshot in current file using eof as pointer # if fail, try next file # if new snapshot time stamp already exists, read next snapshot while 1: f = open(self.flist[self.nextfile], "rb") f.seek(self.eof) snap = self.read_snapshot(f) if not snap: self.nextfile += 1 if self.nextfile == len(self.flist): return -1 f.close() self.eof = 0 continue self.eof = f.tell() f.close() try: self.findtime(snap.time) continue except: break # select the new snapshot with all its atoms self.snaps.append(snap) snap = self.snaps[self.nsnaps] snap.tselect = 1 snap.nselect = snap.natoms for i in range(snap.natoms): snap.aselect[i] = True self.nsnaps += 1 self.nselect += 1 return snap.time # -------------------------------------------------------------------- # read a single snapshot from file f # return snapshot or 0 if failed # for first snapshot only: # assign column names (file must be self-describing) # set scale_original to 0/1/-1 for unscaled/scaled/unknown # convert xs,xu to x in names def read_snapshot(self, f): """ low-level method to read a snapshot from a file identifier """ # expand the list of keywords if needed (INRAE\Olivier Vitrac) # "keyname": ["name in snap","type"] itemkeywords = {"TIME": ["realtime","float"], "TIMESTEP": ["time","int"], "NUMBER OF ATOMS": ["natoms","int"]} try: snap = Snap() # read and guess the first keywords based on itemkeywords found = True while found: item = f.readline() varitem = item.split("ITEM:")[1].strip() found = varitem in itemkeywords if found: tmp = f.readline().split()[0] # just grab 1st field if itemkeywords[varitem][1]=="int": valitem = int(tmp) else: valitem = float(tmp) setattr(snap,itemkeywords[varitem][0],valitem) # prefetch snap.aselect = np.zeros(snap.natoms,dtype=bool) # we assume that the next item is BOX BOUNDS (pp ff pp) words = item.split("BOUNDS ") if len(words) == 1: snap.boxstr = "" else: snap.boxstr = words[1].strip() if "xy" in snap.boxstr: snap.triclinic = 1 else: snap.triclinic = 0 words = f.readline().split() if len(words) == 2: snap.xlo, snap.xhi, snap.xy = float(words[0]), float(words[1]), 0.0 else: snap.xlo, snap.xhi, snap.xy = ( float(words[0]), float(words[1]), float(words[2]), ) words = f.readline().split() if len(words) == 2: snap.ylo, snap.yhi, snap.xz = float(words[0]), float(words[1]), 0.0 else: snap.ylo, snap.yhi, snap.xz = ( float(words[0]), float(words[1]), float(words[2]), ) words = f.readline().split() if len(words) == 2: snap.zlo, snap.zhi, snap.yz = float(words[0]), float(words[1]), 0.0 else: snap.zlo, snap.zhi, snap.yz = ( float(words[0]), float(words[1]), float(words[2]), ) item = f.readline() if len(self.names) == 0: self.scale_original = -1 xflag = yflag = zflag = -1 words = item.split()[2:] if len(words): for i in range(len(words)): if words[i] == "x" or words[i] == "xu": xflag = 0 self.names["x"] = i elif words[i] == "xs" or words[i] == "xsu": xflag = 1 self.names["x"] = i elif words[i] == "y" or words[i] == "yu": yflag = 0 self.names["y"] = i elif words[i] == "ys" or words[i] == "ysu": yflag = 1 self.names["y"] = i elif words[i] == "z" or words[i] == "zu": zflag = 0 self.names["z"] = i elif words[i] == "zs" or words[i] == "zsu": zflag = 1 self.names["z"] = i else: self.names[words[i]] = i if xflag == 0 and yflag == 0 and zflag == 0: self.scale_original = 0 if xflag == 1 and yflag == 1 and zflag == 1: self.scale_original = 1 if snap.natoms: words = f.readline().split() ncol = len(words) for i in range(1, snap.natoms): words += f.readline().split() floats = list(map(float, words)) if oldnumeric: atoms = np.zeros((snap.natoms, ncol), np.float64) else: atoms = np.zeros((snap.natoms, ncol), np.float64) start = 0 stop = ncol for i in range(snap.natoms): atoms[i] = floats[start:stop] start = stop stop += ncol else: atoms = None snap.atoms = atoms return snap except: return 0 # -------------------------------------------------------------------- # map atom column names def map(self, *pairs): if len(pairs) % 2 != 0: raise ValueError("dump map() requires pairs of mappings") for i in range(0, len(pairs), 2): j = i + 1 self.names[pairs[j]] = pairs[i] - 1 # -------------------------------------------------------------------- # delete unselected snapshots def delete(self): ndel = i = 0 while i < self.nsnaps: if not self.snaps[i].tselect: del self.snaps[i] self.nsnaps -= 1 ndel += 1 else: i += 1 print("%d snapshots deleted" % ndel) print("%d snapshots remaining" % self.nsnaps) # -------------------------------------------------------------------- # scale coords to 0-1 for all snapshots or just one # use 6 params as h-matrix to treat orthongonal or triclinic boxes def scale(self, *list): if len(list) == 0: print("Scaling dump ...") x = self.names["x"] y = self.names["y"] z = self.names["z"] for snap in self.snaps: self.scale_one(snap, x, y, z) else: i = self.findtime(list[0]) x = self.names["x"] y = self.names["y"] z = self.names["z"] self.scale_one(self.snaps[i], x, y, z) # -------------------------------------------------------------------- def scale_one(self, snap, x, y, z): if snap.xy == 0.0 and snap.xz == 0.0 and snap.yz == 0.0: xprdinv = 1.0 / (snap.xhi - snap.xlo) yprdinv = 1.0 / (snap.yhi - snap.ylo) zprdinv = 1.0 / (snap.zhi - snap.zlo) atoms = snap.atoms if atoms != None: atoms[:, x] = (atoms[:, x] - snap.xlo) * xprdinv atoms[:, y] = (atoms[:, y] - snap.ylo) * yprdinv atoms[:, z] = (atoms[:, z] - snap.zlo) * zprdinv else: xlo_bound = snap.xlo xhi_bound = snap.xhi ylo_bound = snap.ylo yhi_bound = snap.yhi zlo_bound = snap.zlo zhi_bound = snap.zhi xy = snap.xy xz = snap.xz yz = snap.yz xlo = xlo_bound - min((0.0, xy, xz, xy + xz)) xhi = xhi_bound - max((0.0, xy, xz, xy + xz)) ylo = ylo_bound - min((0.0, yz)) yhi = yhi_bound - max((0.0, yz)) zlo = zlo_bound zhi = zhi_bound h0 = xhi - xlo h1 = yhi - ylo h2 = zhi - zlo h3 = yz h4 = xz h5 = xy h0inv = 1.0 / h0 h1inv = 1.0 / h1 h2inv = 1.0 / h2 h3inv = yz / (h1 * h2) h4inv = (h3 * h5 - h1 * h4) / (h0 * h1 * h2) h5inv = xy / (h0 * h1) atoms = snap.atoms if atoms != None: atoms[:, x] = ( (atoms[:, x] - snap.xlo) * h0inv + (atoms[:, y] - snap.ylo) * h5inv + (atoms[:, z] - snap.zlo) * h4inv ) atoms[:, y] = (atoms[:, y] - snap.ylo) * h1inv + ( atoms[:, z] - snap.zlo ) * h3inv atoms[:, z] = (atoms[:, z] - snap.zlo) * h2inv # -------------------------------------------------------------------- # unscale coords from 0-1 to box size for all snapshots or just one # use 6 params as h-matrix to treat orthongonal or triclinic boxes def unscale(self, *list): if len(list) == 0: print("Unscaling dump ...") x = self.names["x"] y = self.names["y"] z = self.names["z"] for snap in self.snaps: self.unscale_one(snap, x, y, z) else: i = self.findtime(list[0]) x = self.names["x"] y = self.names["y"] z = self.names["z"] self.unscale_one(self.snaps[i], x, y, z) # -------------------------------------------------------------------- def unscale_one(self, snap, x, y, z): if snap.xy == 0.0 and snap.xz == 0.0 and snap.yz == 0.0: xprd = snap.xhi - snap.xlo yprd = snap.yhi - snap.ylo zprd = snap.zhi - snap.zlo atoms = snap.atoms if atoms != None: atoms[:, x] = snap.xlo + atoms[:, x] * xprd atoms[:, y] = snap.ylo + atoms[:, y] * yprd atoms[:, z] = snap.zlo + atoms[:, z] * zprd else: xlo_bound = snap.xlo xhi_bound = snap.xhi ylo_bound = snap.ylo yhi_bound = snap.yhi zlo_bound = snap.zlo zhi_bound = snap.zhi xy = snap.xy xz = snap.xz yz = snap.yz xlo = xlo_bound - min((0.0, xy, xz, xy + xz)) xhi = xhi_bound - max((0.0, xy, xz, xy + xz)) ylo = ylo_bound - min((0.0, yz)) yhi = yhi_bound - max((0.0, yz)) zlo = zlo_bound zhi = zhi_bound h0 = xhi - xlo h1 = yhi - ylo h2 = zhi - zlo h3 = yz h4 = xz h5 = xy atoms = snap.atoms if atoms != None: atoms[:, x] = ( snap.xlo + atoms[:, x] * h0 + atoms[:, y] * h5 + atoms[:, z] * h4 ) atoms[:, y] = snap.ylo + atoms[:, y] * h1 + atoms[:, z] * h3 atoms[:, z] = snap.zlo + atoms[:, z] * h2 # -------------------------------------------------------------------- # wrap coords from outside box to inside def wrap(self): print("Wrapping dump ...") x = self.names["x"] y = self.names["y"] z = self.names["z"] ix = self.names["ix"] iy = self.names["iy"] iz = self.names["iz"] for snap in self.snaps: xprd = snap.xhi - snap.xlo yprd = snap.yhi - snap.ylo zprd = snap.zhi - snap.zlo atoms = snap.atoms atoms[:, x] -= atoms[:, ix] * xprd atoms[:, y] -= atoms[:, iy] * yprd atoms[:, z] -= atoms[:, iz] * zprd # -------------------------------------------------------------------- # unwrap coords from inside box to outside def unwrap(self): print("Unwrapping dump ...") x = self.names["x"] y = self.names["y"] z = self.names["z"] ix = self.names["ix"] iy = self.names["iy"] iz = self.names["iz"] for snap in self.snaps: xprd = snap.xhi - snap.xlo yprd = snap.yhi - snap.ylo zprd = snap.zhi - snap.zlo atoms = snap.atoms atoms[:, x] += atoms[:, ix] * xprd atoms[:, y] += atoms[:, iy] * yprd atoms[:, z] += atoms[:, iz] * zprd # -------------------------------------------------------------------- # wrap coords to same image as atom ID stored in "other" column # if dynamic extra lines or triangles defined, owrap them as well def owrap(self, other): print("Wrapping to other ...") id = self.names["id"] x = self.names["x"] y = self.names["y"] z = self.names["z"] ix = self.names["ix"] iy = self.names["iy"] iz = self.names["iz"] iother = self.names[other] for snap in self.snaps: xprd = snap.xhi - snap.xlo yprd = snap.yhi - snap.ylo zprd = snap.zhi - snap.zlo atoms = snap.atoms ids = {} for i in range(snap.natoms): ids[atoms[i][id]] = i for i in range(snap.natoms): j = ids[atoms[i][iother]] atoms[i][x] += (atoms[i][ix] - atoms[j][ix]) * xprd atoms[i][y] += (atoms[i][iy] - atoms[j][iy]) * yprd atoms[i][z] += (atoms[i][iz] - atoms[j][iz]) * zprd # should bonds also be owrapped ? if self.lineflag == 2 or self.triflag == 2: self.objextra.owrap( snap.time, xprd, yprd, zprd, ids, atoms, iother, ix, iy, iz ) # -------------------------------------------------------------------- # convert column names assignment to a string, in column order def names2str(self): # <-- Python 2.x --> # pairs = self.names.items() # values = self.names.values() # ncol = len(pairs) # str = "" # for i in range(ncol): # if i in values: str += pairs[values.index(i)][0] + ' ' # <-- Python 3.x --> str = "" for k in sorted(self.names, key=self.names.get, reverse=False): str += k + " " return str # -------------------------------------------------------------------- # sort atoms by atom ID in all selected timesteps by default # if arg = string, sort all steps by that column # if arg = numeric, sort atoms in single step def sort(self, *listarg): if len(listarg) == 0: print("Sorting selected snapshots ...") id = self.names["id"] for snap in self.snaps: if snap.tselect: self.sort_one(snap, id) elif type(listarg[0]) is types.StringType: print("Sorting selected snapshots by %s ..." % listarg[0]) id = self.names[listarg[0]] for snap in self.snaps: if snap.tselect: self.sort_one(snap, id) else: i = self.findtime(listarg[0]) id = self.names["id"] self.sort_one(self.snaps[i], id) # -------------------------------------------------------------------- # sort a single snapshot by ID column def sort_one(self, snap, id): atoms = snap.atoms ids = atoms[:, id] ordering = np.argsort(ids) for i in range(len(atoms[0])): atoms[:, i] = np.take(atoms[:, i], ordering) # -------------------------------------------------------------------- # write a single dump file from current selection def write(self, file, header=1, append=0): if len(self.snaps): namestr = self.names2str() if not append: f = open(file, "w") else: f = open(file, "a") if "id" in self.names: id = self.names["id"] else: id = -1 if "type" in self.names: type = self.names["type"] else: type = -1 for snap in self.snaps: if not snap.tselect: continue print(snap.time, file=sys.stdout, flush=True) if header: print("ITEM: TIMESTEP", file=f) print(snap.time, file=f) print("ITEM: NUMBER OF ATOMS", file=f) print(snap.nselect, file=f) if snap.boxstr: print("ITEM: BOX BOUNDS", snap.boxstr, file=f) else: print("ITEM: BOX BOUNDS", file=f) if snap.triclinic: print(snap.xlo, snap.xhi, snap.xy, file=f) print(snap.ylo, snap.yhi, snap.xz, file=f) print(snap.zlo, snap.zhi, snap.yz, file=f) else: print(snap.xlo, snap.xhi, file=f) print(snap.ylo, snap.yhi, file=f) print(snap.zlo, snap.zhi, file=f) print("ITEM: ATOMS", namestr, file=f) atoms = snap.atoms nvalues = len(atoms[0]) for i in range(snap.natoms): if not snap.aselect[i]: continue line = "" for j in range(nvalues): if j == id or j == type: line += str(int(atoms[i][j])) + " " else: line += str(atoms[i][j]) + " " print(line, file=f) f.close() print("\n%d snapshots" % self.nselect) # -------------------------------------------------------------------- # write one dump file per snapshot from current selection def scatter(self, root): if len(self.snaps): namestr = self.names2str() for snap in self.snaps: if not snap.tselect: continue print(snap.time, file=sys.stdout, flush=True) file = root + "." + str(snap.time) f = open(file, "w") print("ITEM: TIMESTEP", file=f) print(snap.time, file=f) print("ITEM: NUMBER OF ATOMS", file=f) print(snap.nselect, file=f) if snap.boxstr: print("ITEM: BOX BOUNDS", snap.boxstr, file=f) else: print("ITEM: BOX BOUNDS", file=f) if snap.triclinic: print(snap.xlo, snap.xhi, snap.xy, file=f) print(snap.ylo, snap.yhi, snap.xz, file=f) print(snap.zlo, snap.zhi, snap.yz, file=f) else: print(snap.xlo, snap.xhi, file=f) print(snap.ylo, snap.yhi, file=f) print(snap.zlo, snap.zhi, file=f) print("ITEM: ATOMS", namestr, file=f) atoms = snap.atoms nvalues = len(atoms[0]) for i in range(snap.natoms): if not snap.aselect[i]: continue line = "" for j in range(nvalues): if j < 2: line += str(int(atoms[i][j])) + " " else: line += str(atoms[i][j]) + " " print(line, file=f) f.close() print("\n%d snapshots" % self.nselect) # -------------------------------------------------------------------- # find min/max across all selected snapshots/atoms for a particular column def minmax(self, colname): icol = self.names[colname] min = 1.0e20 max = -min for snap in self.snaps: if not snap.tselect: continue atoms = snap.atoms for i in range(snap.natoms): if not snap.aselect[i]: continue if atoms[i][icol] < min: min = atoms[i][icol] if atoms[i][icol] > max: max = atoms[i][icol] return (min, max) # -------------------------------------------------------------------- # set a column value via an equation for all selected snapshots def set(self, eq): print("Setting ...") pattern = "\$\w*" list = re.findall(pattern, eq) lhs = list[0][1:] if not self.names.has_key(lhs): self.newcolumn(lhs) for item in list: name = item[1:] column = self.names[name] insert = "snap.atoms[i][%d]" % (column) eq = eq.replace(item, insert) ceq = compile(eq, "", "single") for snap in self.snaps: if not snap.tselect: continue for i in range(snap.natoms): if snap.aselect[i]: exec(ceq) # -------------------------------------------------------------------- # set a column value via an input vec for all selected snapshots/atoms def setv(self, colname, vec): print("Setting ...") if not self.names.has_key(colname): self.newcolumn(colname) icol = self.names[colname] for snap in self.snaps: if not snap.tselect: continue if snap.nselect != len(vec): raise ValueError("vec length does not match # of selected atoms") atoms = snap.atoms m = 0 for i in range(snap.natoms): if snap.aselect[i]: atoms[i][icol] = vec[m] m += 1 # -------------------------------------------------------------------- # clone value in col across selected timesteps for atoms with same ID def clone(self, nstep, col): istep = self.findtime(nstep) icol = self.names[col] id = self.names["id"] ids = {} for i in range(self.snaps[istep].natoms): ids[self.snaps[istep].atoms[i][id]] = i for snap in self.snaps: if not snap.tselect: continue atoms = snap.atoms for i in range(snap.natoms): if not snap.aselect[i]: continue j = ids[atoms[i][id]] atoms[i][icol] = self.snaps[istep].atoms[j][icol] # -------------------------------------------------------------------- # values in old column are spread as ints from 1-N and assigned to new column def spread(self, old, n, new): iold = self.names[old] if not self.names.has_key(new): self.newcolumn(new) inew = self.names[new] min, max = self.minmax(old) print("min/max = ", min, max) gap = max - min invdelta = n / gap for snap in self.snaps: if not snap.tselect: continue atoms = snap.atoms for i in range(snap.natoms): if not snap.aselect[i]: continue ivalue = int((atoms[i][iold] - min) * invdelta) + 1 if ivalue > n: ivalue = n if ivalue < 1: ivalue = 1 atoms[i][inew] = ivalue # -------------------------------------------------------------------- # return vector of selected snapshot time stamps # time is based on TIMESTEP item # realtime is based on TIME item def time(self): """ timestep as stored: time()""" vec = self.nselect * [0] i = 0 for snap in self.snaps: if not snap.tselect: continue vec[i] = snap.time i += 1 return vec def realtime(self): """ time as simulated: realtime() """ vec = self.nselect * [0.0] i = 0 for snap in self.snaps: if not snap.tselect or not hasattr(snap,"realtime"): continue vec[i] = snap.realtime i += 1 return vec # -------------------------------------------------------------------- # extract vector(s) of values for atom ID n at each selected timestep def atom(self, n, *list): if len(list) == 0: raise ValueError("no columns specified") columns = [] values = [] for name in list: columns.append(self.names[name]) values.append(self.nselect * [0]) ncol = len(columns) id = self.names["id"] m = 0 for snap in self.snaps: if not snap.tselect: continue atoms = snap.atoms for i in range(snap.natoms): if atoms[i][id] == n: break if atoms[i][id] != n: raise ValueError("could not find atom ID in snapshot") for j in range(ncol): values[j][m] = atoms[i][columns[j]] m += 1 if len(list) == 1: return values[0] else: return values # -------------------------------------------------------------------- # extract vector(s) of values for selected atoms at chosen timestep def vecs(self, n, *colname): """ vecs(timeste,columname1,columname2,...) Examples: tab = vecs(timestep,"id","x","y") tab = vecs(timestep,["id","x","y"],"z") X.vecs(X.time()[50],"vx","vy") """ snap = self.snaps[self.findtime(n)] if len(colname) == 0: raise ValueError("no columns specified") if isinstance(colname[0],tuple): colname = list(colname[0]) + list(colname[1:]) if isinstance(colname[0],list): colname = colname[0] + list(colname[1:]) columns = [] values = [] for name in colname: columns.append(self.names[name]) values.append(snap.nselect * [0]) ncol = len(columns) m = 0 for i in range(snap.natoms): if not snap.aselect[i]: continue for j in range(ncol): values[j][m] = snap.atoms[i][columns[j]] m += 1 if len(colname) == 1: return values[0] else: return values # -------------------------------------------------------------------- # add a new column to every snapshot and set value to 0 # set the name of the column to str def newcolumn(self, str): ncol = len(self.snaps[0].atoms[0]) self.map(ncol + 1, str) for snap in self.snaps: # commented because not used # atoms = snap.atoms if oldnumeric: newatoms = np.zeros((snap.natoms, ncol + 1), np.Float) else: newatoms = np.zeros((snap.natoms, ncol + 1), np.float) newatoms[:, 0:ncol] = snap.atoms snap.atoms = newatoms # -------------------------------------------------------------------- # sort snapshots on time stamp def compare_time(self, a, b): if a.time < b.time: return -1 elif a.time > b.time: return 1 else: return 0 # -------------------------------------------------------------------- # delete successive snapshots with duplicate time stamp def cull(self): i = 1 while i < len(self.snaps): if self.snaps[i].time == self.snaps[i - 1].time: del self.snaps[i] else: i += 1 # -------------------------------------------------------------------- # iterate over selected snapshots def iterator(self, flag): start = 0 if flag: start = self.iterate + 1 for i in range(start, self.nsnaps): if self.snaps[i].tselect: self.iterate = i return i, self.snaps[i].time, 1 return 0, 0, -1 # -------------------------------------------------------------------- # return list of atoms to viz for snapshot isnap # if called with flag, then index is timestep, so convert to snapshot index # augment with bonds, tris, lines if extra() was invoked def viz(self, index, flag=0): if not flag: isnap = index else: times = self.time() n = len(times) i = 0 while i < n: if times[i] > index: break i += 1 isnap = i - 1 snap = self.snaps[isnap] time = snap.time box = [snap.xlo, snap.ylo, snap.zlo, snap.xhi, snap.yhi, snap.zhi] id = self.names["id"] type = self.names[self.atype] x = self.names["x"] y = self.names["y"] z = self.names["z"] # create atom list needed by viz from id,type,x,y,z # need Numeric/Numpy mode here atoms = [] for i in range(snap.natoms): if not snap.aselect[i]: continue atom = snap.atoms[i] atoms.append([atom[id], atom[type], atom[x], atom[y], atom[z]]) # create list of bonds from static or dynamic bond list # then generate bond coords from bondlist # alist = dictionary of atom IDs for atoms list # lookup bond atom IDs in alist and grab their coords # try is used since some atoms may be unselected # any bond with unselected atom is not added to bonds # need Numeric/Numpy mode here bonds = [] if self.bondflag: if self.bondflag == 1: bondlist = self.bondlist elif self.bondflag == 2: tmp1, tmp2, tmp3, bondlist, tmp4, tmp5 = self.objextra.viz(time, 1) alist = {} for i in range(len(atoms)): alist[int(atoms[i][0])] = i for bond in bondlist: try: i = alist[bond[2]] j = alist[bond[3]] atom1 = atoms[i] atom2 = atoms[j] bonds.append( [ bond[0], bond[1], atom1[2], atom1[3], atom1[4], atom2[2], atom2[3], atom2[4], atom1[1], atom2[1], ] ) except: continue # create list of tris from static or dynamic tri list # if dynamic, could eliminate tris for unselected atoms tris = [] if self.triflag: if self.triflag == 1: tris = self.trilist elif self.triflag == 2: tmp1, tmp2, tmp3, tmp4, tris, tmp5 = self.objextra.viz(time, 1) # create list of lines from static or dynamic tri list # if dynamic, could eliminate lines for unselected atoms lines = [] if self.lineflag: if self.lineflag == 1: lines = self.linelist elif self.lineflag == 2: tmp1, tmp2, tmp3, tmp4, tmp5, lines = self.objextra.viz(time, 1) return time, box, atoms, bonds, tris, lines # -------------------------------------------------------------------- def findtime(self, n): for i in range(self.nsnaps): if self.snaps[i].time == n: return i raise ValueError("no step %d exists" % n) # -------------------------------------------------------------------- # return maximum box size across all selected snapshots def maxbox(self): xlo = ylo = zlo = None xhi = yhi = zhi = None for snap in self.snaps: if not snap.tselect: continue if xlo == None or snap.xlo < xlo: xlo = snap.xlo if xhi == None or snap.xhi > xhi: xhi = snap.xhi if ylo == None or snap.ylo < ylo: ylo = snap.ylo if yhi == None or snap.yhi > yhi: yhi = snap.yhi if zlo == None or snap.zlo < zlo: zlo = snap.zlo if zhi == None or snap.zhi > zhi: zhi = snap.zhi return [xlo, ylo, zlo, xhi, yhi, zhi] # -------------------------------------------------------------------- # return maximum atom type across all selected snapshots and atoms def maxtype(self): icol = self.names["type"] max = 0 for snap in self.snaps: if not snap.tselect: continue atoms = snap.atoms for i in range(snap.natoms): if not snap.aselect[i]: continue if atoms[i][icol] > max: max = atoms[i][icol] return int(max) # -------------------------------------------------------------------- # grab bonds/tris/lines from another object # if static, grab once, else store obj to grab dynamically def extra(self, arg): # data object, grab bonds statically if type(arg) is types.InstanceType and ".data" in str(arg.__class__): self.bondflag = 0 try: bondlist = [] bondlines = arg.sections["Bonds"] for line in bondlines: words = line.split() bondlist.append( [int(words[0]), int(words[1]), int(words[2]), int(words[3])] ) if bondlist: self.bondflag = 1 self.bondlist = bondlist except: raise ValueError("could not extract bonds from data object") # cdata object, grab tris and lines statically elif type(arg) is types.InstanceType and ".cdata" in str(arg.__class__): self.triflag = self.lineflag = 0 try: tmp, tmp, tmp, tmp, tris, lines = arg.viz(0) if tris: self.triflag = 1 self.trilist = tris if lines: self.lineflag = 1 self.linelist = lines except: raise ValueError("could not extract tris/lines from cdata object") # mdump object, grab tris dynamically elif type(arg) is types.InstanceType and ".mdump" in str(arg.__class__): self.triflag = 2 self.objextra = arg # bdump object, grab bonds dynamically elif type(arg) is types.InstanceType and ".bdump" in str(arg.__class__): self.bondflag = 2 self.objextra = arg # ldump object, grab lines dynamically elif type(arg) is types.InstanceType and ".ldump" in str(arg.__class__): self.lineflag = 2 self.objextra = arg # tdump object, grab tris dynamically elif type(arg) is types.InstanceType and ".tdump" in str(arg.__class__): self.triflag = 2 self.objextra = arg else: raise ValueError("unrecognized argument to dump.extra()") # -------------------------------------------------------------------- def compare_atom(self, a, b): if a[0] < b[0]: return -1 elif a[0] > b[0]: return 1 else: return 0 # -------------------------------------------------------------------- def frame(self,iframe): """ simplified class to access properties of a snapshot (INRAE\Olivier Vitrac) """ nframes= len(self.time()); if iframe>=nframes: raise ValueError("the frame index should be ranged between 0 and %d" % nframes) elif iframe<0: iframe = iframe % nframes times = self.time() fields = self.names snap = self.snaps[iframe] frame = Frame() frame.dumpfile = self.flist[0] frame.time = times[iframe] frame.description = {"dumpfile": "dumpobject.flist[0]", "time": "dumpobject.times()[]"} for k in sorted(fields,key=fields.get,reverse=False): kvalid = k # valid key name for rep in ["[","]","#","~","-","_","(",")",",",".",";"]: kvalid = kvalid.replace(rep,"") frame.description[kvalid] = k frame.__dict__[kvalid] = snap.atoms[:,fields[k]] return frame # -------------------------------------------------------------------- def kind(self,listtypes=None): """ guessed kind of dump file based on column names (possibility to supply a personnalized list) (INRAE\Olivier Vitrac) """ if listtypes==None: listtypes = { 'vxyz': ["id","type","x","y","z","vx","vy","vz"], 'xyz': ["id","type","x","y","z"] } internaltypes = True else: listtypes = {"usertype":listtypes} internaltypes = False for t in listtypes: if len(listtypes[t])==0: ismatching = False else: ismatching = True for field in listtypes[t]: ismatching = ismatching and field in self.names if ismatching: break if ismatching: if internaltypes: return t else: return True else: if internaltypes: return None else: return False # -------------------------------------------------------------------- @property def type(self): """ type of dump file defined as a hash of column names """ return hash(self.names2str()) # -------------------------------------------------------------------- def __add__(self,o): """ merge dump objects of the same kind/type """ if not isinstance(o,dump): raise ValueError("the second operand is not a dump object") elif self.type != o.type: raise ValueError("the dumps are not of the same type") twofiles = self.flist[0] + " " + o.flist[0] return dump(twofiles) # --------------------------------------------------------------------
Instance variables
var type
-
type of dump file defined as a hash of column names
Expand source code
@property def type(self): """ type of dump file defined as a hash of column names """ return hash(self.names2str())
Methods
def atom(self, n, *list)
-
Expand source code
def atom(self, n, *list): if len(list) == 0: raise ValueError("no columns specified") columns = [] values = [] for name in list: columns.append(self.names[name]) values.append(self.nselect * [0]) ncol = len(columns) id = self.names["id"] m = 0 for snap in self.snaps: if not snap.tselect: continue atoms = snap.atoms for i in range(snap.natoms): if atoms[i][id] == n: break if atoms[i][id] != n: raise ValueError("could not find atom ID in snapshot") for j in range(ncol): values[j][m] = atoms[i][columns[j]] m += 1 if len(list) == 1: return values[0] else: return values
def clone(self, nstep, col)
-
Expand source code
def clone(self, nstep, col): istep = self.findtime(nstep) icol = self.names[col] id = self.names["id"] ids = {} for i in range(self.snaps[istep].natoms): ids[self.snaps[istep].atoms[i][id]] = i for snap in self.snaps: if not snap.tselect: continue atoms = snap.atoms for i in range(snap.natoms): if not snap.aselect[i]: continue j = ids[atoms[i][id]] atoms[i][icol] = self.snaps[istep].atoms[j][icol]
def compare_atom(self, a, b)
-
Expand source code
def compare_atom(self, a, b): if a[0] < b[0]: return -1 elif a[0] > b[0]: return 1 else: return 0
def compare_time(self, a, b)
-
Expand source code
def compare_time(self, a, b): if a.time < b.time: return -1 elif a.time > b.time: return 1 else: return 0
def cull(self)
-
Expand source code
def cull(self): i = 1 while i < len(self.snaps): if self.snaps[i].time == self.snaps[i - 1].time: del self.snaps[i] else: i += 1
def delete(self)
-
Expand source code
def delete(self): ndel = i = 0 while i < self.nsnaps: if not self.snaps[i].tselect: del self.snaps[i] self.nsnaps -= 1 ndel += 1 else: i += 1 print("%d snapshots deleted" % ndel) print("%d snapshots remaining" % self.nsnaps)
def extra(self, arg)
-
Expand source code
def extra(self, arg): # data object, grab bonds statically if type(arg) is types.InstanceType and ".data" in str(arg.__class__): self.bondflag = 0 try: bondlist = [] bondlines = arg.sections["Bonds"] for line in bondlines: words = line.split() bondlist.append( [int(words[0]), int(words[1]), int(words[2]), int(words[3])] ) if bondlist: self.bondflag = 1 self.bondlist = bondlist except: raise ValueError("could not extract bonds from data object") # cdata object, grab tris and lines statically elif type(arg) is types.InstanceType and ".cdata" in str(arg.__class__): self.triflag = self.lineflag = 0 try: tmp, tmp, tmp, tmp, tris, lines = arg.viz(0) if tris: self.triflag = 1 self.trilist = tris if lines: self.lineflag = 1 self.linelist = lines except: raise ValueError("could not extract tris/lines from cdata object") # mdump object, grab tris dynamically elif type(arg) is types.InstanceType and ".mdump" in str(arg.__class__): self.triflag = 2 self.objextra = arg # bdump object, grab bonds dynamically elif type(arg) is types.InstanceType and ".bdump" in str(arg.__class__): self.bondflag = 2 self.objextra = arg # ldump object, grab lines dynamically elif type(arg) is types.InstanceType and ".ldump" in str(arg.__class__): self.lineflag = 2 self.objextra = arg # tdump object, grab tris dynamically elif type(arg) is types.InstanceType and ".tdump" in str(arg.__class__): self.triflag = 2 self.objextra = arg else: raise ValueError("unrecognized argument to dump.extra()")
def findtime(self, n)
-
Expand source code
def findtime(self, n): for i in range(self.nsnaps): if self.snaps[i].time == n: return i raise ValueError("no step %d exists" % n)
def frame(self, iframe)
-
simplified class to access properties of a snapshot (INRAE\Olivier Vitrac)
Expand source code
def frame(self,iframe): """ simplified class to access properties of a snapshot (INRAE\Olivier Vitrac) """ nframes= len(self.time()); if iframe>=nframes: raise ValueError("the frame index should be ranged between 0 and %d" % nframes) elif iframe<0: iframe = iframe % nframes times = self.time() fields = self.names snap = self.snaps[iframe] frame = Frame() frame.dumpfile = self.flist[0] frame.time = times[iframe] frame.description = {"dumpfile": "dumpobject.flist[0]", "time": "dumpobject.times()[]"} for k in sorted(fields,key=fields.get,reverse=False): kvalid = k # valid key name for rep in ["[","]","#","~","-","_","(",")",",",".",";"]: kvalid = kvalid.replace(rep,"") frame.description[kvalid] = k frame.__dict__[kvalid] = snap.atoms[:,fields[k]] return frame
def iterator(self, flag)
-
Expand source code
def iterator(self, flag): start = 0 if flag: start = self.iterate + 1 for i in range(start, self.nsnaps): if self.snaps[i].tselect: self.iterate = i return i, self.snaps[i].time, 1 return 0, 0, -1
def kind(self, listtypes=None)
-
guessed kind of dump file based on column names (possibility to supply a personnalized list) (INRAE\Olivier Vitrac)
Expand source code
def kind(self,listtypes=None): """ guessed kind of dump file based on column names (possibility to supply a personnalized list) (INRAE\Olivier Vitrac) """ if listtypes==None: listtypes = { 'vxyz': ["id","type","x","y","z","vx","vy","vz"], 'xyz': ["id","type","x","y","z"] } internaltypes = True else: listtypes = {"usertype":listtypes} internaltypes = False for t in listtypes: if len(listtypes[t])==0: ismatching = False else: ismatching = True for field in listtypes[t]: ismatching = ismatching and field in self.names if ismatching: break if ismatching: if internaltypes: return t else: return True else: if internaltypes: return None else: return False
def map(self, *pairs)
-
Expand source code
def map(self, *pairs): if len(pairs) % 2 != 0: raise ValueError("dump map() requires pairs of mappings") for i in range(0, len(pairs), 2): j = i + 1 self.names[pairs[j]] = pairs[i] - 1
def maxbox(self)
-
Expand source code
def maxbox(self): xlo = ylo = zlo = None xhi = yhi = zhi = None for snap in self.snaps: if not snap.tselect: continue if xlo == None or snap.xlo < xlo: xlo = snap.xlo if xhi == None or snap.xhi > xhi: xhi = snap.xhi if ylo == None or snap.ylo < ylo: ylo = snap.ylo if yhi == None or snap.yhi > yhi: yhi = snap.yhi if zlo == None or snap.zlo < zlo: zlo = snap.zlo if zhi == None or snap.zhi > zhi: zhi = snap.zhi return [xlo, ylo, zlo, xhi, yhi, zhi]
def maxtype(self)
-
Expand source code
def maxtype(self): icol = self.names["type"] max = 0 for snap in self.snaps: if not snap.tselect: continue atoms = snap.atoms for i in range(snap.natoms): if not snap.aselect[i]: continue if atoms[i][icol] > max: max = atoms[i][icol] return int(max)
def minmax(self, colname)
-
Expand source code
def minmax(self, colname): icol = self.names[colname] min = 1.0e20 max = -min for snap in self.snaps: if not snap.tselect: continue atoms = snap.atoms for i in range(snap.natoms): if not snap.aselect[i]: continue if atoms[i][icol] < min: min = atoms[i][icol] if atoms[i][icol] > max: max = atoms[i][icol] return (min, max)
def names2str(self)
-
Expand source code
def names2str(self): # <-- Python 2.x --> # pairs = self.names.items() # values = self.names.values() # ncol = len(pairs) # str = "" # for i in range(ncol): # if i in values: str += pairs[values.index(i)][0] + ' ' # <-- Python 3.x --> str = "" for k in sorted(self.names, key=self.names.get, reverse=False): str += k + " " return str
def newcolumn(self, str)
-
Expand source code
def newcolumn(self, str): ncol = len(self.snaps[0].atoms[0]) self.map(ncol + 1, str) for snap in self.snaps: # commented because not used # atoms = snap.atoms if oldnumeric: newatoms = np.zeros((snap.natoms, ncol + 1), np.Float) else: newatoms = np.zeros((snap.natoms, ncol + 1), np.float) newatoms[:, 0:ncol] = snap.atoms snap.atoms = newatoms
def next(self)
-
Expand source code
def next(self): if not self.increment: raise ValueError("cannot read incrementally") # read next snapshot in current file using eof as pointer # if fail, try next file # if new snapshot time stamp already exists, read next snapshot while 1: f = open(self.flist[self.nextfile], "rb") f.seek(self.eof) snap = self.read_snapshot(f) if not snap: self.nextfile += 1 if self.nextfile == len(self.flist): return -1 f.close() self.eof = 0 continue self.eof = f.tell() f.close() try: self.findtime(snap.time) continue except: break # select the new snapshot with all its atoms self.snaps.append(snap) snap = self.snaps[self.nsnaps] snap.tselect = 1 snap.nselect = snap.natoms for i in range(snap.natoms): snap.aselect[i] = True self.nsnaps += 1 self.nselect += 1 return snap.time
def owrap(self, other)
-
Expand source code
def owrap(self, other): print("Wrapping to other ...") id = self.names["id"] x = self.names["x"] y = self.names["y"] z = self.names["z"] ix = self.names["ix"] iy = self.names["iy"] iz = self.names["iz"] iother = self.names[other] for snap in self.snaps: xprd = snap.xhi - snap.xlo yprd = snap.yhi - snap.ylo zprd = snap.zhi - snap.zlo atoms = snap.atoms ids = {} for i in range(snap.natoms): ids[atoms[i][id]] = i for i in range(snap.natoms): j = ids[atoms[i][iother]] atoms[i][x] += (atoms[i][ix] - atoms[j][ix]) * xprd atoms[i][y] += (atoms[i][iy] - atoms[j][iy]) * yprd atoms[i][z] += (atoms[i][iz] - atoms[j][iz]) * zprd # should bonds also be owrapped ? if self.lineflag == 2 or self.triflag == 2: self.objextra.owrap( snap.time, xprd, yprd, zprd, ids, atoms, iother, ix, iy, iz )
def read_all(self)
-
Expand source code
def read_all(self): # read all snapshots from each file # test for gzipped files for file in self.flist: if file[-3:] == ".gz": f = popen("%s -c %s" % (PIZZA_GUNZIP, file), "r") else: f = open(file) snap = self.read_snapshot(f) while snap: self.snaps.append(snap) print(snap.time, file=sys.stdout, flush=True, end=" ") snap = self.read_snapshot(f) f.close() print # sort entries by timestep, cull duplicates self.snaps.sort() # self.snaps.sort(self.compare_time) #%% to be fixed in the future (OV) self.cull() self.nsnaps = len(self.snaps) print("read %d snapshots" % (self.nsnaps)) # select all timesteps and atoms self.tselect.all() # print column assignments if len(self.names): print("assigned columns:", ",".join(list(self.names.keys()))) else: print("no column assignments made") # if snapshots are scaled, unscale them if ( (not "x" in self.names) or (not "y" in self.names) or (not "z" in self.names) ): print("dump scaling status is unknown") elif self.nsnaps > 0: if self.scale_original == 1: self.unscale() elif self.scale_original == 0: print("dump is already unscaled") else: print("dump scaling status is unknown")
def read_snapshot(self, f)
-
low-level method to read a snapshot from a file identifier
Expand source code
def read_snapshot(self, f): """ low-level method to read a snapshot from a file identifier """ # expand the list of keywords if needed (INRAE\Olivier Vitrac) # "keyname": ["name in snap","type"] itemkeywords = {"TIME": ["realtime","float"], "TIMESTEP": ["time","int"], "NUMBER OF ATOMS": ["natoms","int"]} try: snap = Snap() # read and guess the first keywords based on itemkeywords found = True while found: item = f.readline() varitem = item.split("ITEM:")[1].strip() found = varitem in itemkeywords if found: tmp = f.readline().split()[0] # just grab 1st field if itemkeywords[varitem][1]=="int": valitem = int(tmp) else: valitem = float(tmp) setattr(snap,itemkeywords[varitem][0],valitem) # prefetch snap.aselect = np.zeros(snap.natoms,dtype=bool) # we assume that the next item is BOX BOUNDS (pp ff pp) words = item.split("BOUNDS ") if len(words) == 1: snap.boxstr = "" else: snap.boxstr = words[1].strip() if "xy" in snap.boxstr: snap.triclinic = 1 else: snap.triclinic = 0 words = f.readline().split() if len(words) == 2: snap.xlo, snap.xhi, snap.xy = float(words[0]), float(words[1]), 0.0 else: snap.xlo, snap.xhi, snap.xy = ( float(words[0]), float(words[1]), float(words[2]), ) words = f.readline().split() if len(words) == 2: snap.ylo, snap.yhi, snap.xz = float(words[0]), float(words[1]), 0.0 else: snap.ylo, snap.yhi, snap.xz = ( float(words[0]), float(words[1]), float(words[2]), ) words = f.readline().split() if len(words) == 2: snap.zlo, snap.zhi, snap.yz = float(words[0]), float(words[1]), 0.0 else: snap.zlo, snap.zhi, snap.yz = ( float(words[0]), float(words[1]), float(words[2]), ) item = f.readline() if len(self.names) == 0: self.scale_original = -1 xflag = yflag = zflag = -1 words = item.split()[2:] if len(words): for i in range(len(words)): if words[i] == "x" or words[i] == "xu": xflag = 0 self.names["x"] = i elif words[i] == "xs" or words[i] == "xsu": xflag = 1 self.names["x"] = i elif words[i] == "y" or words[i] == "yu": yflag = 0 self.names["y"] = i elif words[i] == "ys" or words[i] == "ysu": yflag = 1 self.names["y"] = i elif words[i] == "z" or words[i] == "zu": zflag = 0 self.names["z"] = i elif words[i] == "zs" or words[i] == "zsu": zflag = 1 self.names["z"] = i else: self.names[words[i]] = i if xflag == 0 and yflag == 0 and zflag == 0: self.scale_original = 0 if xflag == 1 and yflag == 1 and zflag == 1: self.scale_original = 1 if snap.natoms: words = f.readline().split() ncol = len(words) for i in range(1, snap.natoms): words += f.readline().split() floats = list(map(float, words)) if oldnumeric: atoms = np.zeros((snap.natoms, ncol), np.float64) else: atoms = np.zeros((snap.natoms, ncol), np.float64) start = 0 stop = ncol for i in range(snap.natoms): atoms[i] = floats[start:stop] start = stop stop += ncol else: atoms = None snap.atoms = atoms return snap except: return 0
def realtime(self)
-
time as simulated: realtime()
Expand source code
def realtime(self): """ time as simulated: realtime() """ vec = self.nselect * [0.0] i = 0 for snap in self.snaps: if not snap.tselect or not hasattr(snap,"realtime"): continue vec[i] = snap.realtime i += 1 return vec
def scale(self, *list)
-
Expand source code
def scale(self, *list): if len(list) == 0: print("Scaling dump ...") x = self.names["x"] y = self.names["y"] z = self.names["z"] for snap in self.snaps: self.scale_one(snap, x, y, z) else: i = self.findtime(list[0]) x = self.names["x"] y = self.names["y"] z = self.names["z"] self.scale_one(self.snaps[i], x, y, z)
def scale_one(self, snap, x, y, z)
-
Expand source code
def scale_one(self, snap, x, y, z): if snap.xy == 0.0 and snap.xz == 0.0 and snap.yz == 0.0: xprdinv = 1.0 / (snap.xhi - snap.xlo) yprdinv = 1.0 / (snap.yhi - snap.ylo) zprdinv = 1.0 / (snap.zhi - snap.zlo) atoms = snap.atoms if atoms != None: atoms[:, x] = (atoms[:, x] - snap.xlo) * xprdinv atoms[:, y] = (atoms[:, y] - snap.ylo) * yprdinv atoms[:, z] = (atoms[:, z] - snap.zlo) * zprdinv else: xlo_bound = snap.xlo xhi_bound = snap.xhi ylo_bound = snap.ylo yhi_bound = snap.yhi zlo_bound = snap.zlo zhi_bound = snap.zhi xy = snap.xy xz = snap.xz yz = snap.yz xlo = xlo_bound - min((0.0, xy, xz, xy + xz)) xhi = xhi_bound - max((0.0, xy, xz, xy + xz)) ylo = ylo_bound - min((0.0, yz)) yhi = yhi_bound - max((0.0, yz)) zlo = zlo_bound zhi = zhi_bound h0 = xhi - xlo h1 = yhi - ylo h2 = zhi - zlo h3 = yz h4 = xz h5 = xy h0inv = 1.0 / h0 h1inv = 1.0 / h1 h2inv = 1.0 / h2 h3inv = yz / (h1 * h2) h4inv = (h3 * h5 - h1 * h4) / (h0 * h1 * h2) h5inv = xy / (h0 * h1) atoms = snap.atoms if atoms != None: atoms[:, x] = ( (atoms[:, x] - snap.xlo) * h0inv + (atoms[:, y] - snap.ylo) * h5inv + (atoms[:, z] - snap.zlo) * h4inv ) atoms[:, y] = (atoms[:, y] - snap.ylo) * h1inv + ( atoms[:, z] - snap.zlo ) * h3inv atoms[:, z] = (atoms[:, z] - snap.zlo) * h2inv
def scatter(self, root)
-
Expand source code
def scatter(self, root): if len(self.snaps): namestr = self.names2str() for snap in self.snaps: if not snap.tselect: continue print(snap.time, file=sys.stdout, flush=True) file = root + "." + str(snap.time) f = open(file, "w") print("ITEM: TIMESTEP", file=f) print(snap.time, file=f) print("ITEM: NUMBER OF ATOMS", file=f) print(snap.nselect, file=f) if snap.boxstr: print("ITEM: BOX BOUNDS", snap.boxstr, file=f) else: print("ITEM: BOX BOUNDS", file=f) if snap.triclinic: print(snap.xlo, snap.xhi, snap.xy, file=f) print(snap.ylo, snap.yhi, snap.xz, file=f) print(snap.zlo, snap.zhi, snap.yz, file=f) else: print(snap.xlo, snap.xhi, file=f) print(snap.ylo, snap.yhi, file=f) print(snap.zlo, snap.zhi, file=f) print("ITEM: ATOMS", namestr, file=f) atoms = snap.atoms nvalues = len(atoms[0]) for i in range(snap.natoms): if not snap.aselect[i]: continue line = "" for j in range(nvalues): if j < 2: line += str(int(atoms[i][j])) + " " else: line += str(atoms[i][j]) + " " print(line, file=f) f.close() print("\n%d snapshots" % self.nselect)
def set(self, eq)
-
Expand source code
def set(self, eq): print("Setting ...") pattern = "\$\w*" list = re.findall(pattern, eq) lhs = list[0][1:] if not self.names.has_key(lhs): self.newcolumn(lhs) for item in list: name = item[1:] column = self.names[name] insert = "snap.atoms[i][%d]" % (column) eq = eq.replace(item, insert) ceq = compile(eq, "", "single") for snap in self.snaps: if not snap.tselect: continue for i in range(snap.natoms): if snap.aselect[i]: exec(ceq)
def setv(self, colname, vec)
-
Expand source code
def setv(self, colname, vec): print("Setting ...") if not self.names.has_key(colname): self.newcolumn(colname) icol = self.names[colname] for snap in self.snaps: if not snap.tselect: continue if snap.nselect != len(vec): raise ValueError("vec length does not match # of selected atoms") atoms = snap.atoms m = 0 for i in range(snap.natoms): if snap.aselect[i]: atoms[i][icol] = vec[m] m += 1
def sort(self, *listarg)
-
Expand source code
def sort(self, *listarg): if len(listarg) == 0: print("Sorting selected snapshots ...") id = self.names["id"] for snap in self.snaps: if snap.tselect: self.sort_one(snap, id) elif type(listarg[0]) is types.StringType: print("Sorting selected snapshots by %s ..." % listarg[0]) id = self.names[listarg[0]] for snap in self.snaps: if snap.tselect: self.sort_one(snap, id) else: i = self.findtime(listarg[0]) id = self.names["id"] self.sort_one(self.snaps[i], id)
def sort_one(self, snap, id)
-
Expand source code
def sort_one(self, snap, id): atoms = snap.atoms ids = atoms[:, id] ordering = np.argsort(ids) for i in range(len(atoms[0])): atoms[:, i] = np.take(atoms[:, i], ordering)
def spread(self, old, n, new)
-
Expand source code
def spread(self, old, n, new): iold = self.names[old] if not self.names.has_key(new): self.newcolumn(new) inew = self.names[new] min, max = self.minmax(old) print("min/max = ", min, max) gap = max - min invdelta = n / gap for snap in self.snaps: if not snap.tselect: continue atoms = snap.atoms for i in range(snap.natoms): if not snap.aselect[i]: continue ivalue = int((atoms[i][iold] - min) * invdelta) + 1 if ivalue > n: ivalue = n if ivalue < 1: ivalue = 1 atoms[i][inew] = ivalue
def time(self)
-
timestep as stored: time()
Expand source code
def time(self): """ timestep as stored: time()""" vec = self.nselect * [0] i = 0 for snap in self.snaps: if not snap.tselect: continue vec[i] = snap.time i += 1 return vec
def unscale(self, *list)
-
Expand source code
def unscale(self, *list): if len(list) == 0: print("Unscaling dump ...") x = self.names["x"] y = self.names["y"] z = self.names["z"] for snap in self.snaps: self.unscale_one(snap, x, y, z) else: i = self.findtime(list[0]) x = self.names["x"] y = self.names["y"] z = self.names["z"] self.unscale_one(self.snaps[i], x, y, z)
def unscale_one(self, snap, x, y, z)
-
Expand source code
def unscale_one(self, snap, x, y, z): if snap.xy == 0.0 and snap.xz == 0.0 and snap.yz == 0.0: xprd = snap.xhi - snap.xlo yprd = snap.yhi - snap.ylo zprd = snap.zhi - snap.zlo atoms = snap.atoms if atoms != None: atoms[:, x] = snap.xlo + atoms[:, x] * xprd atoms[:, y] = snap.ylo + atoms[:, y] * yprd atoms[:, z] = snap.zlo + atoms[:, z] * zprd else: xlo_bound = snap.xlo xhi_bound = snap.xhi ylo_bound = snap.ylo yhi_bound = snap.yhi zlo_bound = snap.zlo zhi_bound = snap.zhi xy = snap.xy xz = snap.xz yz = snap.yz xlo = xlo_bound - min((0.0, xy, xz, xy + xz)) xhi = xhi_bound - max((0.0, xy, xz, xy + xz)) ylo = ylo_bound - min((0.0, yz)) yhi = yhi_bound - max((0.0, yz)) zlo = zlo_bound zhi = zhi_bound h0 = xhi - xlo h1 = yhi - ylo h2 = zhi - zlo h3 = yz h4 = xz h5 = xy atoms = snap.atoms if atoms != None: atoms[:, x] = ( snap.xlo + atoms[:, x] * h0 + atoms[:, y] * h5 + atoms[:, z] * h4 ) atoms[:, y] = snap.ylo + atoms[:, y] * h1 + atoms[:, z] * h3 atoms[:, z] = snap.zlo + atoms[:, z] * h2
def unwrap(self)
-
Expand source code
def unwrap(self): print("Unwrapping dump ...") x = self.names["x"] y = self.names["y"] z = self.names["z"] ix = self.names["ix"] iy = self.names["iy"] iz = self.names["iz"] for snap in self.snaps: xprd = snap.xhi - snap.xlo yprd = snap.yhi - snap.ylo zprd = snap.zhi - snap.zlo atoms = snap.atoms atoms[:, x] += atoms[:, ix] * xprd atoms[:, y] += atoms[:, iy] * yprd atoms[:, z] += atoms[:, iz] * zprd
def vecs(self, n, *colname)
-
vecs(timeste,columname1,columname2,…)
Examples
tab = vecs(timestep,"id","x","y") tab = vecs(timestep,["id","x","y"],"z") X.vecs(X.time()[50],"vx","vy")
Expand source code
def vecs(self, n, *colname): """ vecs(timeste,columname1,columname2,...) Examples: tab = vecs(timestep,"id","x","y") tab = vecs(timestep,["id","x","y"],"z") X.vecs(X.time()[50],"vx","vy") """ snap = self.snaps[self.findtime(n)] if len(colname) == 0: raise ValueError("no columns specified") if isinstance(colname[0],tuple): colname = list(colname[0]) + list(colname[1:]) if isinstance(colname[0],list): colname = colname[0] + list(colname[1:]) columns = [] values = [] for name in colname: columns.append(self.names[name]) values.append(snap.nselect * [0]) ncol = len(columns) m = 0 for i in range(snap.natoms): if not snap.aselect[i]: continue for j in range(ncol): values[j][m] = snap.atoms[i][columns[j]] m += 1 if len(colname) == 1: return values[0] else: return values
def viz(self, index, flag=0)
-
Expand source code
def viz(self, index, flag=0): if not flag: isnap = index else: times = self.time() n = len(times) i = 0 while i < n: if times[i] > index: break i += 1 isnap = i - 1 snap = self.snaps[isnap] time = snap.time box = [snap.xlo, snap.ylo, snap.zlo, snap.xhi, snap.yhi, snap.zhi] id = self.names["id"] type = self.names[self.atype] x = self.names["x"] y = self.names["y"] z = self.names["z"] # create atom list needed by viz from id,type,x,y,z # need Numeric/Numpy mode here atoms = [] for i in range(snap.natoms): if not snap.aselect[i]: continue atom = snap.atoms[i] atoms.append([atom[id], atom[type], atom[x], atom[y], atom[z]]) # create list of bonds from static or dynamic bond list # then generate bond coords from bondlist # alist = dictionary of atom IDs for atoms list # lookup bond atom IDs in alist and grab their coords # try is used since some atoms may be unselected # any bond with unselected atom is not added to bonds # need Numeric/Numpy mode here bonds = [] if self.bondflag: if self.bondflag == 1: bondlist = self.bondlist elif self.bondflag == 2: tmp1, tmp2, tmp3, bondlist, tmp4, tmp5 = self.objextra.viz(time, 1) alist = {} for i in range(len(atoms)): alist[int(atoms[i][0])] = i for bond in bondlist: try: i = alist[bond[2]] j = alist[bond[3]] atom1 = atoms[i] atom2 = atoms[j] bonds.append( [ bond[0], bond[1], atom1[2], atom1[3], atom1[4], atom2[2], atom2[3], atom2[4], atom1[1], atom2[1], ] ) except: continue # create list of tris from static or dynamic tri list # if dynamic, could eliminate tris for unselected atoms tris = [] if self.triflag: if self.triflag == 1: tris = self.trilist elif self.triflag == 2: tmp1, tmp2, tmp3, tmp4, tris, tmp5 = self.objextra.viz(time, 1) # create list of lines from static or dynamic tri list # if dynamic, could eliminate lines for unselected atoms lines = [] if self.lineflag: if self.lineflag == 1: lines = self.linelist elif self.lineflag == 2: tmp1, tmp2, tmp3, tmp4, tmp5, lines = self.objextra.viz(time, 1) return time, box, atoms, bonds, tris, lines
def wrap(self)
-
Expand source code
def wrap(self): print("Wrapping dump ...") x = self.names["x"] y = self.names["y"] z = self.names["z"] ix = self.names["ix"] iy = self.names["iy"] iz = self.names["iz"] for snap in self.snaps: xprd = snap.xhi - snap.xlo yprd = snap.yhi - snap.ylo zprd = snap.zhi - snap.zlo atoms = snap.atoms atoms[:, x] -= atoms[:, ix] * xprd atoms[:, y] -= atoms[:, iy] * yprd atoms[:, z] -= atoms[:, iz] * zprd
def write(self, file, header=1, append=0)
-
Expand source code
def write(self, file, header=1, append=0): if len(self.snaps): namestr = self.names2str() if not append: f = open(file, "w") else: f = open(file, "a") if "id" in self.names: id = self.names["id"] else: id = -1 if "type" in self.names: type = self.names["type"] else: type = -1 for snap in self.snaps: if not snap.tselect: continue print(snap.time, file=sys.stdout, flush=True) if header: print("ITEM: TIMESTEP", file=f) print(snap.time, file=f) print("ITEM: NUMBER OF ATOMS", file=f) print(snap.nselect, file=f) if snap.boxstr: print("ITEM: BOX BOUNDS", snap.boxstr, file=f) else: print("ITEM: BOX BOUNDS", file=f) if snap.triclinic: print(snap.xlo, snap.xhi, snap.xy, file=f) print(snap.ylo, snap.yhi, snap.xz, file=f) print(snap.zlo, snap.zhi, snap.yz, file=f) else: print(snap.xlo, snap.xhi, file=f) print(snap.ylo, snap.yhi, file=f) print(snap.zlo, snap.zhi, file=f) print("ITEM: ATOMS", namestr, file=f) atoms = snap.atoms nvalues = len(atoms[0]) for i in range(snap.natoms): if not snap.aselect[i]: continue line = "" for j in range(nvalues): if j == id or j == type: line += str(int(atoms[i][j])) + " " else: line += str(atoms[i][j]) + " " print(line, file=f) f.close() print("\n%d snapshots" % self.nselect)