Module EUFCMannex1
Module: eu_fcm_annex1
This module implements a robust database manager for Annex I of EU Regulation 10/2011 EC, which lists authorized food contact substances. The module reads a CSV file (downloaded from ECHA) and converts each row into a JSON record stored in a cache directory (as files named recXXXXX.annex1.json). It then builds a global index (stored as annex1_index.json in the cache folder) mapping key fields—such as chemical name, EC number, CAS number, FCM number, reference, and PubChem CID—to their corresponding record numbers.
Key features include: - Conversion of CSV rows into JSON records with ordered keys. - Use of a primary key (the record number, which is 1-based and stored in the "record" field) instead of a zero-based Python index. - Lookup methods by CAS, record number, PubChem CID, as well as by other fields (byname, byEC, byFCM, byRef). - Additional search functions that filter records by migration limit (SML and SMLT) ranges. - Caching of substances with missing PubChem results in a separate file ("missing.pubchem.annex1.json") so that lookup requests for substances with a nonempty CAS that fail to resolve are not repeated. - Custom repr and str methods for the record objects, with text wrapping that allows the first line to wrap at a set width and subsequent lines to wrap at a separate width. - Enhanced error handling in getitem that returns informative messages when the provided key (either record number or CAS) is out of range.
Global Rate Limiting: The module assumes that the PubChem query throttling is managed by a module-level global variable (e.g. PubChem_lastQueryTime) within the imported 'migrant' function from the patankar.loadpubchem module. This state is maintained across multiple instances of EuFCMannex1.
Usage Example:
db = EuFCMannex1() rec = db[1060] # Retrieves the record with record number 1060. rec_by_cas = db.byCAS("102-39-6") for r in db: … print(r["name"]) if 10 in db: … print("Record 10 exists.") if 113194 in db: … print("PubChem cid 113194 exists.")
@version: 1.40 @project: SFPPy - SafeFoodPackaging Portal in Python initiative @author: INRAE\olivier.vitrac@agroparistech.fr @licence: MIT @Date: 2024-01-10 @rev: 2025-03-27
Expand source code
"""
Module: eu_fcm_annex1
This module implements a robust database manager for Annex I of EU Regulation 10/2011 EC, which lists authorized food contact substances.
The module reads a CSV file (downloaded from ECHA) and converts each row into a JSON record stored in a cache directory (as files named recXXXXX.annex1.json).
It then builds a global index (stored as annex1_index.json in the cache folder) mapping key fields—such as chemical name, EC number, CAS number, FCM number, reference,
and PubChem CID—to their corresponding record numbers.
Key features include:
- Conversion of CSV rows into JSON records with ordered keys.
- Use of a primary key (the record number, which is 1-based and stored in the "record" field) instead of a zero-based Python index.
- Lookup methods by CAS, record number, PubChem CID, as well as by other fields (byname, byEC, byFCM, byRef).
- Additional search functions that filter records by migration limit (SML and SMLT) ranges.
- Caching of substances with missing PubChem results in a separate file ("missing.pubchem.annex1.json") so that lookup requests for substances with a nonempty CAS that fail to resolve are not repeated.
- Custom __repr__ and __str__ methods for the record objects, with text wrapping that allows the first line to wrap at a set width and subsequent lines to wrap at a separate width.
- Enhanced error handling in __getitem__ that returns informative messages when the provided key (either record number or CAS) is out of range.
Global Rate Limiting:
The module assumes that the PubChem query throttling is managed by a module-level global variable (e.g. PubChem_lastQueryTime) within the imported
'migrant' function from the patankar.loadpubchem module. This state is maintained across multiple instances of EuFCMannex1.
Usage Example:
>>> db = EuFCMannex1()
>>> rec = db[1060] # Retrieves the record with record number 1060.
>>> rec_by_cas = db.byCAS("102-39-6")
>>> for r in db:
... print(r["name"])
>>> if 10 in db:
... print("Record 10 exists.")
>>> if 113194 in db:
... print("PubChem cid 113194 exists.")
@version: 1.40
@project: SFPPy - SafeFoodPackaging Portal in Python initiative
@author: INRAE\\olivier.vitrac@agroparistech.fr
@licence: MIT
@Date: 2024-01-10
@rev: 2025-03-27
"""
import os,csv, json, datetime, time, re, textwrap
__all__ = ['EuFCMannex1', 'annex1record', 'annex1record_ext', 'custom_wrap', 'printWARN']
__project__ = "SFPPy"
__author__ = "Olivier Vitrac"
__copyright__ = "Copyright 2022"
__credits__ = ["Olivier Vitrac"]
__license__ = "MIT"
__maintainer__ = "Olivier Vitrac"
__email__ = "olivier.vitrac@agroparistech.fr"
__version__ = "1.41"
# Default value for SML when field is empty.
SMLdefault = 60.0
# Module-level variables to track last warning message and its timestamp
_LAST_WARN_ = None
_T_LAST_WARN_ = 0.0
# %% private functions
def custom_wrap(text, width=40, indent=" " * 22):
# Wrap first line to first_width characters.
first_line = textwrap.wrap(text, width=width)
if not first_line:
return ""
# Get the first line and then compute the remaining text.
first = first_line[0]
remaining = text[len(first):].lstrip()
# Wrap the remaining text to subsequent_width characters.
subsequent_lines = textwrap.wrap(remaining, width=width)
# Prepend the indent to subsequent lines.
wrapped = [first] + [indent + line for line in subsequent_lines]
return "\n".join(wrapped)
def printWARN(message: str, tsilent: float = 10.0):
"""
Print a warning message only if:
- it's different from the last one, or
- more than `tsilent` seconds have passed since the last identical warning.
Parameters:
----------
message : str
The warning message to display.
tsilent : float, optional
Minimum time (in seconds) between repeated identical warnings.
"""
global _LAST_WARN_, _T_LAST_WARN_
tnow = time.time()
if message != _LAST_WARN_ or (tnow - _T_LAST_WARN_ > tsilent):
print(message)
_LAST_WARN_ = message
_T_LAST_WARN_ = tnow
# %% main classes
# ----------------------------------------------------------------------
# annex1record: a subclass of dict to represent one substance record.
# ----------------------------------------------------------------------
class annex1record(dict):
"""
Represents a single substance record from Annex I of EU Regulation 10/2011 EC.
This class is a subclass of dict that stores the data for one authorized substance. It contains keys such as:
- "record": the primary record number (1-based, as assigned from the CSV file),
- "cid": the PubChem compound identifier (which may be None),
- "name", "CAS", "EC", "FCM", "Ref", and additional fields such as migration limits (SML, SMLT), restrictions, and notes.
The class provides custom __str__ and __repr__ methods for formatted display. In __repr__, only nonempty fields are shown,
the header line begins with "record: X of Y", and text wrapping is applied so that the first line wraps at a specified width and subsequent lines wrap at a set width (with proper indenting).
"""
def __init__(self, d, order=None, total=None):
"""
Initialize an annex1record from dictionary d.
order: the record number (from the CSV; if known)
total: total number of records (if known)
"""
if not isinstance(d,dict):
raise TypeError("dict must be a dict not a {type(d).__name__}")
super().__init__(d)
# Use the value stored in the record (key "record") if available.
self._order = d.get("record", order)
self._total = total
def __str__(self):
cid = self.get("cid", None)
order_str = f"{self._order}" if self._order is not None else "?"
total_str = f"{self._total}" if self._total is not None else "?"
return f"<{self.__class__.__name__} with cid:{cid} - record {order_str} of {total_str} (Annex 1 of 10/2011/EC)>"
def __repr__(self):
lines = []
order_str = f"{self._order}" if self._order is not None else "?"
total_str = f"{self._total}" if self._total is not None else "?"
header = f" ---- [ 🇪🇺 10/2011/EC record: {order_str} of {total_str} ] ----"
lines.append(header)
# Define display order; note that we do not show SMLunit, SMLTunit, csvFile, or date.
fields_order = [
"record", "cid", "name", "CAS", "EC", "FCM", "Ref",
"Additive_or_PPA", "Use_as_monomer_macromolecule", "FRFapplicable",
"SML", "SMLTrestrictions", "SMLTGroupFCMsubstances", "SMLT",
"Restriction(s)", "NotesCompliance", "Notes", "Physicalform", "ExpressedAs"
]
# For each field, if value is not empty then display it.
for key in fields_order:
if key not in self:
continue
val = self[key]
if val is None or (isinstance(val, str) and not val.strip()):
continue
# For SML and SMLT, append the unit inline.
if key == "SML":
display_val = f"{val} [mg/kg]"
elif key == "SMLT":
display_val = f"{val} [mg/kg]"
else:
display_val = str(val)
# Wrap the value to 60 characters; subsequent lines are indented by 25 spaces.
wrapped_val = custom_wrap(display_val, width=60, indent=" " * 22)
lines.append(f"{key:>20}: {wrapped_val}")
# for the extended class
if isinstance(self,annex1record):
lines.append(f"\n{'--- extended':>20}: properties ---\n")
attr_order = ["cid","M","SML","SMLT","n","gFCM","gM","gMmin","gCASmin","gnamemin"]
for attr in attr_order:
if hasattr(self, attr):
val = getattr(self, attr)
lines.append(f"{attr:>20}: {val}")
print("\n".join(lines))
return str(self)
@property
def ispubchemok(self):
"""Returns True if the record may match in PubChem (i.e., compatible with patankar.loadpubchem)"""
return self.get("cid") is not None and self.get("CAS") not in ("", None)
# ----------------------------------------------------------------------
# annex1record_ext: an extension of annex1 records with additional info
# The EU 10/2011 Annex 1 is not a database with unique susbtances.
# It is not fully compatible with migrant which requires a unique id.
# The extended/promoted class try to bridge them and add some info.
# We need to keep annex1=False when calling migrant to avoid circular references
# ----------------------------------------------------------------------
class annex1record_ext(annex1record):
"""extended annex1record class with additional attributes"""
def __init__(self,rec,db=None):
"""instantiate from a record with a consolidated databse: db"""
if not isinstance(rec,annex1record):
raise TypeError("dict must be a annex1record not a {type(d).__name__}")
super().__init__(rec,order=rec._order,total=rec._total)
from patankar.loadpubchem import migrant
if rec.ispubchemok:
try:
m = migrant(rec.get("CAS"),annex1=False)
M = m.M
except Exception:
print(f"{rec.get('name')} could not be extended from its CAS {rec.get('CAS')}: not found")
M = None
else:
M = None
self.cid = self.get("cid")
self.SML = self.get("SML")
self.SMLT = self.get("SMLT")
if self.SMLT and self.SMLT<self.SML:
self.SML = self.SMLT
self.gFCM = self.get("SMLTGroupFCMsubstances")
self.n = len(self.gFCM) if self.gFCM else None
self.M = M # molecular mass added if cid was available
self.gM = None # molecular masses of sustances in the group
self.gMmin = None # minimal molecular mass of substances in the group
self.gCASmin = None # CAS of the lightest/smallest substances
self.gnamemin = None # its name
# we will add group info gM, gMin, gCASmin, gnamemin
# do not forget, no bijection between rid (record id) and fcm
# here fcm2rid[self.gFCM[0]], fcm2rid[self.gFCM[1]],...
# are all equal, they give rids (record indices) for the group matching this FCM
if db is not None and self.gFCM is not None:
fcm2rid = db._fcm2rid
# gFCM keys should be string but they can be cached as int
if isinstance(self.gFCM[0],str):
rids = fcm2rid[self.gFCM[0]]
else:
rids = fcm2rid[str(self.gFCM[0])]
Mlist = [db._load_record(rid,db=False).M for rid in rids]
CASlist = [db._load_record(rid,db=False).get("CAS") for rid in rids]
namelist = [db._load_record(rid,db=False).get("name") for rid in rids]
# Find index of the smallest non-None molecular mass
valid_indices = [i for i, m in enumerate(Mlist) if m is not None] # Indices of valid values
if valid_indices:
min_index = min(valid_indices, key=lambda i: Mlist[i]) # Index of minimum valid M value
self.gM = [m for m in Mlist if m is not None]
self.gMmin = Mlist[min_index]
self.gCASmin = CASlist[min_index]
self.gnamemin = namelist[min_index]
# ----------------------------------------------------------------------
# EuFCMannex1: the main class to manage the Annex I CSV file and cache.
# ----------------------------------------------------------------------
class EuFCMannex1:
"""
Manages the Annex I CSV file and caches its data for efficient lookup and analysis.
This class reads the CSV file (representing the authorized substances as provided by ECHA) and processes each row into a JSON record.
Each record is saved in a cache directory as recXXXXX.annex1.json (where XXXXX is the 1-based record number). The class then builds a global index
(stored as annex1_index.json in the cache folder) that maps various foreign keys—such as "name", "EC", "CAS", "FCM", "Ref", and PubChem cid—to the
corresponding record numbers.
Key functionality includes:
- Retrieving records by a record number (which is now interpreted directly from the record’s "record" field, not as a Python 0-based index).
- Lookup of records by CAS (string keys) and by PubChem cid via the bycid method.
- Support for slicing, list-based access, callable access (e.g., dbannex1(113194) retrieves by PubChem cid or record number),
iteration over records, and membership testing.
- Additional search methods: byname, byEC, byCAS, byFCM, byRef, bySML, and bySMLT.
- Handling of substances that lack a CAS number (and therefore cannot have a PubChem cid) by skipping PubChem lookup, while for substances with a nonempty CAS that fail the lookup,
the CAS is recorded in a separate missing file ("missing.pubchem.annex1.json") to avoid future redundant queries.
- Enhanced error messages in __getitem__ that include the invalid key and the valid range or a sample of valid keys.
Global Rate Limiting:
This class relies on a module-level global variable (e.g., PubChem_lastQueryTime in the patankar.loadpubchem module) to manage rate-limiting
when performing PubChem lookups.
Usage Example:
>>> db = EuFCMannex1()
>>> rec = db[1060] # Retrieves record number 1060.
>>> rec_by_cas = db.byCAS("102-39-6")
>>> rec_by_cid = db.bycid(113194)
>>> for record in db:
... print(record["name"])
>>> if 10 in db:
... print("Record 10 exists.")
>>> if 113194 in db:
... print("PubChem cid 113194 exists.")
"""
isAnnex1Initialized = False # class attribute
def __init__(self, cache_dir="cache.EuFCMannex1", index_file="annex1_index.json", pubchem=True):
"""
Initialize the database.
(1) The working directory is set to the directory of __file__.
(2) The CSV file is "fcm-and-articles-regulation--annex-i---authorised-substances-export.csv".
(3) If the CSV file is missing, a FileNotFoundError is raised.
(4) The class uses a cache in cache_dir and stores the global index inside that folder.
"""
self.base_dir = os.path.dirname(__file__)
self.csv_file = os.path.join(self.base_dir, "fcm-and-articles-regulation--annex-i---authorised-substances-export.csv")
if not os.path.exists(self.csv_file):
raise FileNotFoundError(f"CSV file {self.csv_file} not found.")
self.cache_dir = os.path.join(self.base_dir, cache_dir)
if not os.path.exists(self.cache_dir):
os.makedirs(self.cache_dir)
# Store the index file in the cache directory.
self.index_file = os.path.join(self.cache_dir, index_file)
if os.path.exists(self.index_file):
with open(self.index_file, "r", encoding="utf-8") as f:
self.index = json.load(f)
else:
self.refresh_index()
self.order = self.index.get("order", [])
# Internal cache for loaded records.
self._records_cache = {}
EuFCMannex1.isAnnex1Initialized = True
# pubchem extensions
# we cache the pairing fcm->rid (record id=primary key)
self._pubchem = False # mandatory to avoid circular imports
self._fcm2rid = self.SMLT_Groupsubstances if pubchem else None
self._pubchem = pubchem # we enforce pubchem, the database is initialized indeed
@classmethod
def isindexinitialized(cls, cache_dir="cache.EuFCMannex1", index_file="annex1_index.json"):
"""Return True if the database is available"""
return os.path.exists(os.path.join(os.path.dirname(__file__),cache_dir, index_file))
def refresh_index(self):
"""
Rebuild the global index by reading the CSV file and regenerating each record
as recXXXXX.annex1.json (with record number as primary key). The index includes foreign key
mappings for "name", "EC", "CAS", "FCM", "Ref" and a bycid index for PubChem cid.
For substances with an empty CAS number, no PubChem lookup is performed.
For substances with a nonempty CAS, if the lookup fails (raising ValueError) the CAS is recorded
in a missing file ("missing.pubchem.annex1.json") so that future refreshes do not re-query PubChem.
(Rate limiting is assumed to be managed via the module’s global variable PubChem_lastQueryTime.)
"""
from patankar.loadpubchem import migrant # local import of migrant
new_index = {}
new_index["index_date"] = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
new_index["csv_file"] = os.path.basename(self.csv_file)
new_index["order"] = []
for key in ["name", "EC", "CAS", "FCM", "Ref"]:
new_index[key] = {}
new_index["bycid"] = {}
# Load missing CAS numbers (only for substances with a CAS)
missing_file = os.path.join(self.cache_dir, "missing.pubchem.annex1.json")
if os.path.exists(missing_file):
with open(missing_file, "r", encoding="utf-8") as mf:
missing_pubchem = json.load(mf)
else:
missing_pubchem = {}
records = []
rec_num = 0
with open(self.csv_file, "r", encoding="utf-8") as f:
reader = csv.reader(f, delimiter="\t", quotechar='"')
header_found = False
for row in reader:
if not header_found:
if row and row[0] == "Name":
header_found = True
continue
if not row or len(row) < 19:
continue
rec_num += 1
rec = {}
def yesno(val):
val = val.strip().lower()
if val == "yes":
return True
elif val == "no":
return False
return None
# Use record number as primary key stored as "record"
rec["record"] = rec_num
rec["cid"] = None # default value
# Traceability (stored but not shown in __repr__)
rec["csvFile"] = os.path.basename(self.csv_file)
rec["date"] = new_index["index_date"]
rec["name"] = row[0].strip()
rec["EC"] = row[1].strip()
rec["CAS"] = row[2].strip()
rec["name2"] = row[3].strip()
rec["CAS2"] = row[4].strip()
try:
rec["FCM"] = int(row[5].strip()) if row[5].strip() not in ["", "-"] else None
except:
rec["FCM"] = None
rec["Ref"] = row[6].strip()
rec["Additive_or_PPA"] = yesno(row[7])
rec["Use_as_monomer_macromolecule"] = yesno(row[8])
rec["FRFapplicable"] = yesno(row[9])
def parse_sml(val):
val = val.strip()
if not val:
return SMLdefault
m = re.search(r"([\d\.]+)", val)
return float(m.group(1)) if m else SMLdefault
rec["SML"] = parse_sml(row[10])
rec["SMLunit"] = "mg/kg"
rec["SMLTrestrictions"] = row[11].strip()
val = row[12].strip()
if val:
try:
rec["SMLTGroupFCMsubstances"] = [int(x.strip()) for x in val.split(",") if x.strip()]
except:
rec["SMLTGroupFCMsubstances"] = None
else:
rec["SMLTGroupFCMsubstances"] = None
rec["SMLT"] = parse_sml(row[13])
if row[13].strip() == "":
rec["SMLT"] = rec["SML"]
rec["SMLTunit"] = "mg/kg"
rec["Restrictions"] = row[14].strip()
rec["NotesCompliance"] = row[15].strip()
rec["Notes"] = row[16].strip()
rec["Physicalform"] = row[17].strip()
rec["ExpressedAs"] = row[18].strip()
# Only attempt PubChem lookup if CAS is nonempty.
cas_val = rec["CAS"]
if cas_val and cas_val.strip():
if cas_val in missing_pubchem:
cid_val = missing_pubchem[cas_val]
else:
try:
cid_val = migrant(cas_val,annex1=False).cid
except ValueError:
printWARN(f"🇪🇺 Warning: substance {rec['name']} (CAS {cas_val}) not found in PubChem.")
cid_val = None
missing_pubchem[cas_val] = None
else:
# No CAS provided, so we know there will be no cid.
cid_val = None
rec["cid"] = cid_val
# Order the record keys: first "record", then "cid", then others.
ordered_rec = {
"record": rec["record"],
"cid": rec["cid"],
"name": rec["name"],
"CAS": rec["CAS"],
"EC": rec["EC"],
"FCM": rec["FCM"],
"Ref": rec["Ref"],
"Additive_or_PPA": rec["Additive_or_PPA"],
"Use_as_monomer_macromolecule": rec["Use_as_monomer_macromolecule"],
"FRFapplicable": rec["FRFapplicable"],
"SML": rec["SML"],
"SMLunit": rec["SMLunit"],
"SMLTrestrictions": rec["SMLTrestrictions"],
"SMLTGroupFCMsubstances": rec["SMLTGroupFCMsubstances"],
"SMLT": rec["SMLT"],
"SMLTunit": rec["SMLTunit"],
"Restrictions": rec["Restrictions"],
"NotesCompliance": rec["NotesCompliance"],
"Notes": rec["Notes"],
"Physicalform": rec["Physicalform"],
"ExpressedAs": rec["ExpressedAs"],
"engine": f"SFPPy: {os.path.basename(__file__)}",
"csvFile": rec["csvFile"],
"date": rec["date"]
}
rec_filename = f"rec{rec_num:05d}.annex1.json"
json_filename = os.path.join(self.cache_dir, rec_filename)
with open(json_filename, "w", encoding="utf-8") as jf:
json.dump(ordered_rec, jf, ensure_ascii=False, indent=2)
new_index["order"].append(rec_num)
for key in ["name", "EC", "CAS", "FCM", "Ref"]:
value = rec[key]
if value not in new_index[key]:
new_index[key][value] = []
new_index[key][value].append(rec_num)
if cid_val is not None:
new_index["bycid"][cid_val] = rec_num
records.append(ordered_rec)
with open(self.index_file, "w", encoding="utf-8") as f:
json.dump(new_index, f, ensure_ascii=False, indent=2)
with open(missing_file, "w", encoding="utf-8") as mf:
json.dump(missing_pubchem, mf, ensure_ascii=False, indent=2)
self.index = new_index
self.order = new_index.get("order", [])
self._records_cache = {}
def _load_record(self, rec_id, order=None, db=False):
"""
Load a record (as an annex1record) from its cached JSON file.
If the file does not exist, return None with a warning.
Extended records are managed via the global flag self._pubchem
The local flag db sets whether the record will be informed or not from the full database
"""
if rec_id in self._records_cache:
if self._pubchem:
if db:
return annex1record_ext(self._records_cache[rec_id],self)
else:
return annex1record_ext(self._records_cache[rec_id])
else:
return self._records_cache[rec_id]
json_filename = os.path.join(self.cache_dir, f"rec{rec_id:05d}.annex1.json")
if not os.path.exists(json_filename):
printWARN(f"🇪🇺 Warning: Record file for record {rec_id} not found.")
return None
with open(json_filename, "r", encoding="utf-8") as jf:
rec = json.load(jf)
# Pass the record's own "record" field as order.
record_obj = annex1record(rec, order=rec.get("record"), total=len(self.order))
self._records_cache[rec_id] = record_obj
if self._pubchem:
self._pubchem = False # mandatory to avoid circular imports
self._fcm2rid = self.SMLT_Groupsubstances # we refresh the cache with the new substance
self._pubchem = True
if db:
return annex1record_ext(record_obj,self)
else:
return annex1record_ext(record_obj)
else:
return record_obj
def __getitem__(self, key):
"""
__getitem__ supports:
- integer keys: interpreted as record numbers.
If the record number is not found, an error is raised showing the entered value and the valid range.
- slices: returns a list of records whose record numbers fall within the slice.
- list/tuple: returns a list of corresponding records.
- string keys: interpreted as CAS numbers.
If the CAS number is not found, an error is raised including the entered value and a sample of valid keys.
"""
if isinstance(key, slice):
# Interpret the slice start and stop as record numbers.
start = key.start if key.start is not None else min(self.order)
stop = key.stop if key.stop is not None else max(self.order) + 1
rec_ids = [rid for rid in self.order if start <= rid < stop]
if not rec_ids:
raise KeyError(f"No records found in range {start} to {stop - 1}. Valid record numbers range from {min(self.order)} to {max(self.order)}.")
return [self._load_record(rid, order=rid) for rid in rec_ids]
elif isinstance(key, int):
# Interpret the integer key directly as a record number.
if key in self.order:
return self._load_record(key, order=key)
else:
raise KeyError(f"Record number {key} not found. Valid record numbers range from {min(self.order)} to {max(self.order)}.")
elif isinstance(key, (list, tuple)):
return [self.__getitem__(k) for k in key]
elif isinstance(key, str):
# Interpret a string key as a CAS number.
if key in self.index.get("CAS", {}):
rec_ids = self.index["CAS"][key]
if len(rec_ids) == 1:
return self._load_record(rec_ids[0], order=rec_ids[0])
else:
return [self._load_record(rid, order=rid) for rid in rec_ids]
else:
available = list(self.index.get("CAS", {}).keys())
sample = ", ".join(available[:10]) + (" ..." if len(available) > 10 else "")
raise KeyError(f"CAS key '{key}' not found in index. Valid CAS keys include: {sample}")
else:
raise KeyError(f"Unsupported key type: {type(key)}")
def __call__(self, *args):
"""
Callable access. For example:
- dbannex1(cid) returns the record for a given PubChem cid (via bycid),
- dbannex1(rec) returns the record for a given record number,
- dbannex1(cid1, cid2, ...) or dbannex1([cid1, cid2, ...]) returns a list.
Strings are interpreted as CAS numbers.
"""
if len(args) == 1 and isinstance(args[0], (list, tuple)):
args = args[0]
results = []
for arg in args:
if isinstance(arg, int):
argkey = str(arg)
if arg in self.order:
results.append(self._load_record(arg))
elif "bycid" in self.index and argkey in self.index["bycid"]:
rec_id = self.index["bycid"][argkey]
results.append(self._load_record(rec_id))
else:
printWARN(f"🇪🇺 Warning: Record for identifier {arg} not found.")
results.append(None)
elif isinstance(arg, str):
result_item = self.__getitem__(arg)
if isinstance(result_item, list):
results.extend(result_item)
else:
results.append(result_item)
else:
raise KeyError(f"Unsupported key type in call: {type(arg)}")
if len(results) == 1:
return results[0]
return results
# --------------------------
# Additional search methods
# --------------------------
def byname(self, name):
name = name[0] if isinstance(name,list) else name
rec_ids = self.index.get("name", {}).get(name, [])
return [self._load_record(rid, order=rid) for rid in rec_ids]
def byEC(self, ec):
ec = ec[0] if isinstance(ec,list) else ec
rec_ids = self.index.get("EC", {}).get(ec, [])
return [self._load_record(rid, order=rid) for rid in rec_ids]
def byCAS(self, cas):
cas = cas[0] if isinstance(cas,list) else cas
rec_ids = self.index.get("CAS", {}).get(cas, [])
if len(rec_ids) == 1:
return self._load_record(rec_ids[0], order=rec_ids[0],db=True)
else:
return [self._load_record(rid, order=rid,db=True) for rid in rec_ids]
def byFCM(self, fcm):
fcm = fcm[0] if isinstance(fcm,list) else fcm
fcm = str(fcm) if isinstance(fcm,int) else fcm
rec_ids = self.index.get("FCM", {}).get(fcm, [])
return [self._load_record(rid, order=rid) for rid in rec_ids]
def byRef(self, ref):
ref = ref[0] if isinstance(ref,list) else ref
rec_ids = self.index.get("Ref", {}).get(ref, [])
return [self._load_record(rid, order=rid) for rid in rec_ids]
def bycid(self, cid, verbose=True):
"""
Search for a record by PubChem cid.
"""
cid = cid[0] if isinstance(cid,list) else cid
cidkey = str(cid)
if "bycid" in self.index and cidkey in self.index["bycid"]:
rec_id = self.index["bycid"][str(cid)]
return self._load_record(rec_id, order=rec_id,db=True)
else:
if verbose:
printWARN(f"⚠️ Warning: No 🇪🇺 10/2011/EC record found for PubChem cid {cid}.")
return None
def bySML(self, min_val, max_val):
results = []
for rid in self.order:
rec = self._load_record(rid)
if rec is not None:
sml = rec.get("SML", SMLdefault)
if min_val <= sml <= max_val:
results.append(rec)
return results
def bySMLT(self, min_val, max_val):
results = []
for rid in self.order:
rec = self._load_record(rid)
if rec is not None:
smlt = rec.get("SMLT", SMLdefault)
if min_val <= smlt <= max_val:
results.append(rec)
return results
def __iter__(self):
for rid in self.order:
yield self._load_record(rid, order=rid)
def __len__(self):
return len(self.order)
def __contains__(self, item):
if isinstance(item,list):
item = item[0]
if isinstance(item, int):
argkey = str(item)
return item in self.order or ("bycid" in self.index and argkey in self.index["bycid"])
if isinstance(item, str):
return item in self.index.get("CAS", {})
return False
@property
def SMLT_Groupsubstances(self):
"""
Returns a dictionary G so that G[fcm] = [rid1, rid2] (cached in memory and on disk)
- **Hidden Field Cache:**
The property first checks if `self._SMLT_Groupsubstances` exists and returns it immediately if so.
- **File Cache:**
The cache file path is built by taking `self.index_file`, splitting it into a base and extension, then appending “.group” before the extension.
If this cache file exists, the code attempts to load it. Any issues during file reading (e.g., corruption) will fall back to recomputing the groups.
- **Recomputation and Write-back:**
If the cache file does not exist or fails to load, the code computes the groups dictionary, caches it in memory, and writes the result to the cache file.
This double caching strategy ensures that once the groups are computed, subsequent accesses are fast (both from memory and from disk), while still allowing a persistent cache that survives between sessions.
"""
# If already cached in the hidden field, return it immediately.
if hasattr(self, '_SMLT_Groupsubstances'):
return self._SMLT_Groupsubstances
# Build the cache file name using self.index_file:
base, ext = os.path.splitext(self.index_file)
cache_file = base + ".group" + ext # e.g., annex1_index.group.json
# Try to load the cache from the file if it exists.
if os.path.exists(cache_file):
try:
with open(cache_file, "r", encoding="utf-8") as f:
groups = json.load(f)
self._SMLT_Groupsubstances = groups
return groups
except Exception as e:
printWARN(f"🇪🇺 Warning: Failed to load group cache from {cache_file}: {e}")
# Compute the groups if not cached.
groups = {}
for rid in self.order:
rec = self._load_record(rid)
lst = rec.get("SMLTGroupFCMsubstances")
if lst:
for fcm in lst:
groups.setdefault(fcm, []).append(rec.get("record"))
# Cache the result in the hidden field.
self._SMLT_Groupsubstances = groups
# Write the computed groups to the cache file.
try:
with open(cache_file, "w", encoding="utf-8") as f:
json.dump(groups, f, indent=2, ensure_ascii=False)
except Exception as e:
printWARN(f"🇪🇺 Warning: Failed to write group cache to {cache_file}: {e}")
return groups
def __repr__(self):
csv_filename = os.path.basename(self.csv_file)
index_date = self.index.get("index_date", "unknown")
print(f"Annex 1 of 🇪🇺EU regulation 10/2011 EC ({len(self.order)} records)")
print(f"Imported from CSV {csv_filename} and indexed on {index_date}")
return str(self)
def __str__(self):
return f"<{self.__class__.__name__}: {len(self.order)} records (Annex 1 of 10/2011/EC)>"
# -------------------------------------------------------------------
# Example usage (for debugging / standalone tests)
# -------------------------------------------------------------------
if __name__ == "__main__":
dbannex1 = EuFCMannex1(pubchem=True) # we promote the whole database
rec = dbannex1.bycid(6581)
repr(rec)
print(repr(dbannex1))
print(str(dbannex1))
first_record = dbannex1[1]
print(first_record)
# Search methods examples:
rec_by_cas = dbannex1.byCAS("102-39-6")
rec_by_name = dbannex1.byname("(1,3-phenylenedioxy)diacetic acid")
# Search by PubChem cid:
rec_by_cid = dbannex1.bycid(113194) # if valid
rec_by_cid_missing = dbannex1.bycid(1023456) # will warn and return None
# Search by SML range:
sml_records = dbannex1.bySML(0.01, 1.0)
# Callable access:
rec = dbannex1(113194) # by cid (if exists) or by record number
rec_list = dbannex1(1023456, "102-39-6")
# Iterate over records:
for rec in dbannex1:
print(rec.get("cid"))
# Test membership:
if 10 in dbannex1:
print("Record 10 exists.")
if 113194 in dbannex1:
print("PubChem cid 113194 exists.")
Functions
def custom_wrap(text, width=40, indent=' ')
-
Expand source code
def custom_wrap(text, width=40, indent=" " * 22): # Wrap first line to first_width characters. first_line = textwrap.wrap(text, width=width) if not first_line: return "" # Get the first line and then compute the remaining text. first = first_line[0] remaining = text[len(first):].lstrip() # Wrap the remaining text to subsequent_width characters. subsequent_lines = textwrap.wrap(remaining, width=width) # Prepend the indent to subsequent lines. wrapped = [first] + [indent + line for line in subsequent_lines] return "\n".join(wrapped)
def printWARN(message: str, tsilent: float = 10.0)
-
Print a warning message only if: - it's different from the last one, or - more than
tsilent
seconds have passed since the last identical warning.Parameters:
message : str The warning message to display. tsilent : float, optional Minimum time (in seconds) between repeated identical warnings.
Expand source code
def printWARN(message: str, tsilent: float = 10.0): """ Print a warning message only if: - it's different from the last one, or - more than `tsilent` seconds have passed since the last identical warning. Parameters: ---------- message : str The warning message to display. tsilent : float, optional Minimum time (in seconds) between repeated identical warnings. """ global _LAST_WARN_, _T_LAST_WARN_ tnow = time.time() if message != _LAST_WARN_ or (tnow - _T_LAST_WARN_ > tsilent): print(message) _LAST_WARN_ = message _T_LAST_WARN_ = tnow
Classes
class EuFCMannex1 (cache_dir='cache.EuFCMannex1', index_file='annex1_index.json', pubchem=True)
-
Manages the Annex I CSV file and caches its data for efficient lookup and analysis.
This class reads the CSV file (representing the authorized substances as provided by ECHA) and processes each row into a JSON record. Each record is saved in a cache directory as recXXXXX.annex1.json (where XXXXX is the 1-based record number). The class then builds a global index (stored as annex1_index.json in the cache folder) that maps various foreign keys—such as "name", "EC", "CAS", "FCM", "Ref", and PubChem cid—to the corresponding record numbers.
Key functionality includes: - Retrieving records by a record number (which is now interpreted directly from the record’s "record" field, not as a Python 0-based index). - Lookup of records by CAS (string keys) and by PubChem cid via the bycid method. - Support for slicing, list-based access, callable access (e.g., dbannex1(113194) retrieves by PubChem cid or record number), iteration over records, and membership testing. - Additional search methods: byname, byEC, byCAS, byFCM, byRef, bySML, and bySMLT. - Handling of substances that lack a CAS number (and therefore cannot have a PubChem cid) by skipping PubChem lookup, while for substances with a nonempty CAS that fail the lookup, the CAS is recorded in a separate missing file ("missing.pubchem.annex1.json") to avoid future redundant queries. - Enhanced error messages in getitem that include the invalid key and the valid range or a sample of valid keys.
Global Rate Limiting: This class relies on a module-level global variable (e.g., PubChem_lastQueryTime in the patankar.loadpubchem module) to manage rate-limiting when performing PubChem lookups.
Usage Example:
db = EuFCMannex1() rec = db[1060] # Retrieves record number 1060. rec_by_cas = db.byCAS("102-39-6") rec_by_cid = db.bycid(113194) for record in db: … print(record["name"]) if 10 in db: … print("Record 10 exists.") if 113194 in db: … print("PubChem cid 113194 exists.")
Initialize the database.
(1) The working directory is set to the directory of file. (2) The CSV file is "fcm-and-articles-regulation–annex-i—authorised-substances-export.csv". (3) If the CSV file is missing, a FileNotFoundError is raised. (4) The class uses a cache in cache_dir and stores the global index inside that folder.
Expand source code
class EuFCMannex1: """ Manages the Annex I CSV file and caches its data for efficient lookup and analysis. This class reads the CSV file (representing the authorized substances as provided by ECHA) and processes each row into a JSON record. Each record is saved in a cache directory as recXXXXX.annex1.json (where XXXXX is the 1-based record number). The class then builds a global index (stored as annex1_index.json in the cache folder) that maps various foreign keys—such as "name", "EC", "CAS", "FCM", "Ref", and PubChem cid—to the corresponding record numbers. Key functionality includes: - Retrieving records by a record number (which is now interpreted directly from the record’s "record" field, not as a Python 0-based index). - Lookup of records by CAS (string keys) and by PubChem cid via the bycid method. - Support for slicing, list-based access, callable access (e.g., dbannex1(113194) retrieves by PubChem cid or record number), iteration over records, and membership testing. - Additional search methods: byname, byEC, byCAS, byFCM, byRef, bySML, and bySMLT. - Handling of substances that lack a CAS number (and therefore cannot have a PubChem cid) by skipping PubChem lookup, while for substances with a nonempty CAS that fail the lookup, the CAS is recorded in a separate missing file ("missing.pubchem.annex1.json") to avoid future redundant queries. - Enhanced error messages in __getitem__ that include the invalid key and the valid range or a sample of valid keys. Global Rate Limiting: This class relies on a module-level global variable (e.g., PubChem_lastQueryTime in the patankar.loadpubchem module) to manage rate-limiting when performing PubChem lookups. Usage Example: >>> db = EuFCMannex1() >>> rec = db[1060] # Retrieves record number 1060. >>> rec_by_cas = db.byCAS("102-39-6") >>> rec_by_cid = db.bycid(113194) >>> for record in db: ... print(record["name"]) >>> if 10 in db: ... print("Record 10 exists.") >>> if 113194 in db: ... print("PubChem cid 113194 exists.") """ isAnnex1Initialized = False # class attribute def __init__(self, cache_dir="cache.EuFCMannex1", index_file="annex1_index.json", pubchem=True): """ Initialize the database. (1) The working directory is set to the directory of __file__. (2) The CSV file is "fcm-and-articles-regulation--annex-i---authorised-substances-export.csv". (3) If the CSV file is missing, a FileNotFoundError is raised. (4) The class uses a cache in cache_dir and stores the global index inside that folder. """ self.base_dir = os.path.dirname(__file__) self.csv_file = os.path.join(self.base_dir, "fcm-and-articles-regulation--annex-i---authorised-substances-export.csv") if not os.path.exists(self.csv_file): raise FileNotFoundError(f"CSV file {self.csv_file} not found.") self.cache_dir = os.path.join(self.base_dir, cache_dir) if not os.path.exists(self.cache_dir): os.makedirs(self.cache_dir) # Store the index file in the cache directory. self.index_file = os.path.join(self.cache_dir, index_file) if os.path.exists(self.index_file): with open(self.index_file, "r", encoding="utf-8") as f: self.index = json.load(f) else: self.refresh_index() self.order = self.index.get("order", []) # Internal cache for loaded records. self._records_cache = {} EuFCMannex1.isAnnex1Initialized = True # pubchem extensions # we cache the pairing fcm->rid (record id=primary key) self._pubchem = False # mandatory to avoid circular imports self._fcm2rid = self.SMLT_Groupsubstances if pubchem else None self._pubchem = pubchem # we enforce pubchem, the database is initialized indeed @classmethod def isindexinitialized(cls, cache_dir="cache.EuFCMannex1", index_file="annex1_index.json"): """Return True if the database is available""" return os.path.exists(os.path.join(os.path.dirname(__file__),cache_dir, index_file)) def refresh_index(self): """ Rebuild the global index by reading the CSV file and regenerating each record as recXXXXX.annex1.json (with record number as primary key). The index includes foreign key mappings for "name", "EC", "CAS", "FCM", "Ref" and a bycid index for PubChem cid. For substances with an empty CAS number, no PubChem lookup is performed. For substances with a nonempty CAS, if the lookup fails (raising ValueError) the CAS is recorded in a missing file ("missing.pubchem.annex1.json") so that future refreshes do not re-query PubChem. (Rate limiting is assumed to be managed via the module’s global variable PubChem_lastQueryTime.) """ from patankar.loadpubchem import migrant # local import of migrant new_index = {} new_index["index_date"] = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") new_index["csv_file"] = os.path.basename(self.csv_file) new_index["order"] = [] for key in ["name", "EC", "CAS", "FCM", "Ref"]: new_index[key] = {} new_index["bycid"] = {} # Load missing CAS numbers (only for substances with a CAS) missing_file = os.path.join(self.cache_dir, "missing.pubchem.annex1.json") if os.path.exists(missing_file): with open(missing_file, "r", encoding="utf-8") as mf: missing_pubchem = json.load(mf) else: missing_pubchem = {} records = [] rec_num = 0 with open(self.csv_file, "r", encoding="utf-8") as f: reader = csv.reader(f, delimiter="\t", quotechar='"') header_found = False for row in reader: if not header_found: if row and row[0] == "Name": header_found = True continue if not row or len(row) < 19: continue rec_num += 1 rec = {} def yesno(val): val = val.strip().lower() if val == "yes": return True elif val == "no": return False return None # Use record number as primary key stored as "record" rec["record"] = rec_num rec["cid"] = None # default value # Traceability (stored but not shown in __repr__) rec["csvFile"] = os.path.basename(self.csv_file) rec["date"] = new_index["index_date"] rec["name"] = row[0].strip() rec["EC"] = row[1].strip() rec["CAS"] = row[2].strip() rec["name2"] = row[3].strip() rec["CAS2"] = row[4].strip() try: rec["FCM"] = int(row[5].strip()) if row[5].strip() not in ["", "-"] else None except: rec["FCM"] = None rec["Ref"] = row[6].strip() rec["Additive_or_PPA"] = yesno(row[7]) rec["Use_as_monomer_macromolecule"] = yesno(row[8]) rec["FRFapplicable"] = yesno(row[9]) def parse_sml(val): val = val.strip() if not val: return SMLdefault m = re.search(r"([\d\.]+)", val) return float(m.group(1)) if m else SMLdefault rec["SML"] = parse_sml(row[10]) rec["SMLunit"] = "mg/kg" rec["SMLTrestrictions"] = row[11].strip() val = row[12].strip() if val: try: rec["SMLTGroupFCMsubstances"] = [int(x.strip()) for x in val.split(",") if x.strip()] except: rec["SMLTGroupFCMsubstances"] = None else: rec["SMLTGroupFCMsubstances"] = None rec["SMLT"] = parse_sml(row[13]) if row[13].strip() == "": rec["SMLT"] = rec["SML"] rec["SMLTunit"] = "mg/kg" rec["Restrictions"] = row[14].strip() rec["NotesCompliance"] = row[15].strip() rec["Notes"] = row[16].strip() rec["Physicalform"] = row[17].strip() rec["ExpressedAs"] = row[18].strip() # Only attempt PubChem lookup if CAS is nonempty. cas_val = rec["CAS"] if cas_val and cas_val.strip(): if cas_val in missing_pubchem: cid_val = missing_pubchem[cas_val] else: try: cid_val = migrant(cas_val,annex1=False).cid except ValueError: printWARN(f"🇪🇺 Warning: substance {rec['name']} (CAS {cas_val}) not found in PubChem.") cid_val = None missing_pubchem[cas_val] = None else: # No CAS provided, so we know there will be no cid. cid_val = None rec["cid"] = cid_val # Order the record keys: first "record", then "cid", then others. ordered_rec = { "record": rec["record"], "cid": rec["cid"], "name": rec["name"], "CAS": rec["CAS"], "EC": rec["EC"], "FCM": rec["FCM"], "Ref": rec["Ref"], "Additive_or_PPA": rec["Additive_or_PPA"], "Use_as_monomer_macromolecule": rec["Use_as_monomer_macromolecule"], "FRFapplicable": rec["FRFapplicable"], "SML": rec["SML"], "SMLunit": rec["SMLunit"], "SMLTrestrictions": rec["SMLTrestrictions"], "SMLTGroupFCMsubstances": rec["SMLTGroupFCMsubstances"], "SMLT": rec["SMLT"], "SMLTunit": rec["SMLTunit"], "Restrictions": rec["Restrictions"], "NotesCompliance": rec["NotesCompliance"], "Notes": rec["Notes"], "Physicalform": rec["Physicalform"], "ExpressedAs": rec["ExpressedAs"], "engine": f"SFPPy: {os.path.basename(__file__)}", "csvFile": rec["csvFile"], "date": rec["date"] } rec_filename = f"rec{rec_num:05d}.annex1.json" json_filename = os.path.join(self.cache_dir, rec_filename) with open(json_filename, "w", encoding="utf-8") as jf: json.dump(ordered_rec, jf, ensure_ascii=False, indent=2) new_index["order"].append(rec_num) for key in ["name", "EC", "CAS", "FCM", "Ref"]: value = rec[key] if value not in new_index[key]: new_index[key][value] = [] new_index[key][value].append(rec_num) if cid_val is not None: new_index["bycid"][cid_val] = rec_num records.append(ordered_rec) with open(self.index_file, "w", encoding="utf-8") as f: json.dump(new_index, f, ensure_ascii=False, indent=2) with open(missing_file, "w", encoding="utf-8") as mf: json.dump(missing_pubchem, mf, ensure_ascii=False, indent=2) self.index = new_index self.order = new_index.get("order", []) self._records_cache = {} def _load_record(self, rec_id, order=None, db=False): """ Load a record (as an annex1record) from its cached JSON file. If the file does not exist, return None with a warning. Extended records are managed via the global flag self._pubchem The local flag db sets whether the record will be informed or not from the full database """ if rec_id in self._records_cache: if self._pubchem: if db: return annex1record_ext(self._records_cache[rec_id],self) else: return annex1record_ext(self._records_cache[rec_id]) else: return self._records_cache[rec_id] json_filename = os.path.join(self.cache_dir, f"rec{rec_id:05d}.annex1.json") if not os.path.exists(json_filename): printWARN(f"🇪🇺 Warning: Record file for record {rec_id} not found.") return None with open(json_filename, "r", encoding="utf-8") as jf: rec = json.load(jf) # Pass the record's own "record" field as order. record_obj = annex1record(rec, order=rec.get("record"), total=len(self.order)) self._records_cache[rec_id] = record_obj if self._pubchem: self._pubchem = False # mandatory to avoid circular imports self._fcm2rid = self.SMLT_Groupsubstances # we refresh the cache with the new substance self._pubchem = True if db: return annex1record_ext(record_obj,self) else: return annex1record_ext(record_obj) else: return record_obj def __getitem__(self, key): """ __getitem__ supports: - integer keys: interpreted as record numbers. If the record number is not found, an error is raised showing the entered value and the valid range. - slices: returns a list of records whose record numbers fall within the slice. - list/tuple: returns a list of corresponding records. - string keys: interpreted as CAS numbers. If the CAS number is not found, an error is raised including the entered value and a sample of valid keys. """ if isinstance(key, slice): # Interpret the slice start and stop as record numbers. start = key.start if key.start is not None else min(self.order) stop = key.stop if key.stop is not None else max(self.order) + 1 rec_ids = [rid for rid in self.order if start <= rid < stop] if not rec_ids: raise KeyError(f"No records found in range {start} to {stop - 1}. Valid record numbers range from {min(self.order)} to {max(self.order)}.") return [self._load_record(rid, order=rid) for rid in rec_ids] elif isinstance(key, int): # Interpret the integer key directly as a record number. if key in self.order: return self._load_record(key, order=key) else: raise KeyError(f"Record number {key} not found. Valid record numbers range from {min(self.order)} to {max(self.order)}.") elif isinstance(key, (list, tuple)): return [self.__getitem__(k) for k in key] elif isinstance(key, str): # Interpret a string key as a CAS number. if key in self.index.get("CAS", {}): rec_ids = self.index["CAS"][key] if len(rec_ids) == 1: return self._load_record(rec_ids[0], order=rec_ids[0]) else: return [self._load_record(rid, order=rid) for rid in rec_ids] else: available = list(self.index.get("CAS", {}).keys()) sample = ", ".join(available[:10]) + (" ..." if len(available) > 10 else "") raise KeyError(f"CAS key '{key}' not found in index. Valid CAS keys include: {sample}") else: raise KeyError(f"Unsupported key type: {type(key)}") def __call__(self, *args): """ Callable access. For example: - dbannex1(cid) returns the record for a given PubChem cid (via bycid), - dbannex1(rec) returns the record for a given record number, - dbannex1(cid1, cid2, ...) or dbannex1([cid1, cid2, ...]) returns a list. Strings are interpreted as CAS numbers. """ if len(args) == 1 and isinstance(args[0], (list, tuple)): args = args[0] results = [] for arg in args: if isinstance(arg, int): argkey = str(arg) if arg in self.order: results.append(self._load_record(arg)) elif "bycid" in self.index and argkey in self.index["bycid"]: rec_id = self.index["bycid"][argkey] results.append(self._load_record(rec_id)) else: printWARN(f"🇪🇺 Warning: Record for identifier {arg} not found.") results.append(None) elif isinstance(arg, str): result_item = self.__getitem__(arg) if isinstance(result_item, list): results.extend(result_item) else: results.append(result_item) else: raise KeyError(f"Unsupported key type in call: {type(arg)}") if len(results) == 1: return results[0] return results # -------------------------- # Additional search methods # -------------------------- def byname(self, name): name = name[0] if isinstance(name,list) else name rec_ids = self.index.get("name", {}).get(name, []) return [self._load_record(rid, order=rid) for rid in rec_ids] def byEC(self, ec): ec = ec[0] if isinstance(ec,list) else ec rec_ids = self.index.get("EC", {}).get(ec, []) return [self._load_record(rid, order=rid) for rid in rec_ids] def byCAS(self, cas): cas = cas[0] if isinstance(cas,list) else cas rec_ids = self.index.get("CAS", {}).get(cas, []) if len(rec_ids) == 1: return self._load_record(rec_ids[0], order=rec_ids[0],db=True) else: return [self._load_record(rid, order=rid,db=True) for rid in rec_ids] def byFCM(self, fcm): fcm = fcm[0] if isinstance(fcm,list) else fcm fcm = str(fcm) if isinstance(fcm,int) else fcm rec_ids = self.index.get("FCM", {}).get(fcm, []) return [self._load_record(rid, order=rid) for rid in rec_ids] def byRef(self, ref): ref = ref[0] if isinstance(ref,list) else ref rec_ids = self.index.get("Ref", {}).get(ref, []) return [self._load_record(rid, order=rid) for rid in rec_ids] def bycid(self, cid, verbose=True): """ Search for a record by PubChem cid. """ cid = cid[0] if isinstance(cid,list) else cid cidkey = str(cid) if "bycid" in self.index and cidkey in self.index["bycid"]: rec_id = self.index["bycid"][str(cid)] return self._load_record(rec_id, order=rec_id,db=True) else: if verbose: printWARN(f"⚠️ Warning: No 🇪🇺 10/2011/EC record found for PubChem cid {cid}.") return None def bySML(self, min_val, max_val): results = [] for rid in self.order: rec = self._load_record(rid) if rec is not None: sml = rec.get("SML", SMLdefault) if min_val <= sml <= max_val: results.append(rec) return results def bySMLT(self, min_val, max_val): results = [] for rid in self.order: rec = self._load_record(rid) if rec is not None: smlt = rec.get("SMLT", SMLdefault) if min_val <= smlt <= max_val: results.append(rec) return results def __iter__(self): for rid in self.order: yield self._load_record(rid, order=rid) def __len__(self): return len(self.order) def __contains__(self, item): if isinstance(item,list): item = item[0] if isinstance(item, int): argkey = str(item) return item in self.order or ("bycid" in self.index and argkey in self.index["bycid"]) if isinstance(item, str): return item in self.index.get("CAS", {}) return False @property def SMLT_Groupsubstances(self): """ Returns a dictionary G so that G[fcm] = [rid1, rid2] (cached in memory and on disk) - **Hidden Field Cache:** The property first checks if `self._SMLT_Groupsubstances` exists and returns it immediately if so. - **File Cache:** The cache file path is built by taking `self.index_file`, splitting it into a base and extension, then appending “.group” before the extension. If this cache file exists, the code attempts to load it. Any issues during file reading (e.g., corruption) will fall back to recomputing the groups. - **Recomputation and Write-back:** If the cache file does not exist or fails to load, the code computes the groups dictionary, caches it in memory, and writes the result to the cache file. This double caching strategy ensures that once the groups are computed, subsequent accesses are fast (both from memory and from disk), while still allowing a persistent cache that survives between sessions. """ # If already cached in the hidden field, return it immediately. if hasattr(self, '_SMLT_Groupsubstances'): return self._SMLT_Groupsubstances # Build the cache file name using self.index_file: base, ext = os.path.splitext(self.index_file) cache_file = base + ".group" + ext # e.g., annex1_index.group.json # Try to load the cache from the file if it exists. if os.path.exists(cache_file): try: with open(cache_file, "r", encoding="utf-8") as f: groups = json.load(f) self._SMLT_Groupsubstances = groups return groups except Exception as e: printWARN(f"🇪🇺 Warning: Failed to load group cache from {cache_file}: {e}") # Compute the groups if not cached. groups = {} for rid in self.order: rec = self._load_record(rid) lst = rec.get("SMLTGroupFCMsubstances") if lst: for fcm in lst: groups.setdefault(fcm, []).append(rec.get("record")) # Cache the result in the hidden field. self._SMLT_Groupsubstances = groups # Write the computed groups to the cache file. try: with open(cache_file, "w", encoding="utf-8") as f: json.dump(groups, f, indent=2, ensure_ascii=False) except Exception as e: printWARN(f"🇪🇺 Warning: Failed to write group cache to {cache_file}: {e}") return groups def __repr__(self): csv_filename = os.path.basename(self.csv_file) index_date = self.index.get("index_date", "unknown") print(f"Annex 1 of 🇪🇺EU regulation 10/2011 EC ({len(self.order)} records)") print(f"Imported from CSV {csv_filename} and indexed on {index_date}") return str(self) def __str__(self): return f"<{self.__class__.__name__}: {len(self.order)} records (Annex 1 of 10/2011/EC)>"
Class variables
var isAnnex1Initialized
Static methods
def isindexinitialized(cache_dir='cache.EuFCMannex1', index_file='annex1_index.json')
-
Return True if the database is available
Expand source code
@classmethod def isindexinitialized(cls, cache_dir="cache.EuFCMannex1", index_file="annex1_index.json"): """Return True if the database is available""" return os.path.exists(os.path.join(os.path.dirname(__file__),cache_dir, index_file))
Instance variables
var SMLT_Groupsubstances
-
Returns a dictionary G so that G[fcm] = [rid1, rid2] (cached in memory and on disk)
-
Hidden Field Cache: The property first checks if
self._SMLT_Groupsubstances
exists and returns it immediately if so. -
File Cache: The cache file path is built by taking
self.index_file
, splitting it into a base and extension, then appending “.group” before the extension. If this cache file exists, the code attempts to load it. Any issues during file reading (e.g., corruption) will fall back to recomputing the groups. -
Recomputation and Write-back: If the cache file does not exist or fails to load, the code computes the groups dictionary, caches it in memory, and writes the result to the cache file.
This double caching strategy ensures that once the groups are computed, subsequent accesses are fast (both from memory and from disk), while still allowing a persistent cache that survives between sessions.
Expand source code
@property def SMLT_Groupsubstances(self): """ Returns a dictionary G so that G[fcm] = [rid1, rid2] (cached in memory and on disk) - **Hidden Field Cache:** The property first checks if `self._SMLT_Groupsubstances` exists and returns it immediately if so. - **File Cache:** The cache file path is built by taking `self.index_file`, splitting it into a base and extension, then appending “.group” before the extension. If this cache file exists, the code attempts to load it. Any issues during file reading (e.g., corruption) will fall back to recomputing the groups. - **Recomputation and Write-back:** If the cache file does not exist or fails to load, the code computes the groups dictionary, caches it in memory, and writes the result to the cache file. This double caching strategy ensures that once the groups are computed, subsequent accesses are fast (both from memory and from disk), while still allowing a persistent cache that survives between sessions. """ # If already cached in the hidden field, return it immediately. if hasattr(self, '_SMLT_Groupsubstances'): return self._SMLT_Groupsubstances # Build the cache file name using self.index_file: base, ext = os.path.splitext(self.index_file) cache_file = base + ".group" + ext # e.g., annex1_index.group.json # Try to load the cache from the file if it exists. if os.path.exists(cache_file): try: with open(cache_file, "r", encoding="utf-8") as f: groups = json.load(f) self._SMLT_Groupsubstances = groups return groups except Exception as e: printWARN(f"🇪🇺 Warning: Failed to load group cache from {cache_file}: {e}") # Compute the groups if not cached. groups = {} for rid in self.order: rec = self._load_record(rid) lst = rec.get("SMLTGroupFCMsubstances") if lst: for fcm in lst: groups.setdefault(fcm, []).append(rec.get("record")) # Cache the result in the hidden field. self._SMLT_Groupsubstances = groups # Write the computed groups to the cache file. try: with open(cache_file, "w", encoding="utf-8") as f: json.dump(groups, f, indent=2, ensure_ascii=False) except Exception as e: printWARN(f"🇪🇺 Warning: Failed to write group cache to {cache_file}: {e}") return groups
-
Methods
def byCAS(self, cas)
-
Expand source code
def byCAS(self, cas): cas = cas[0] if isinstance(cas,list) else cas rec_ids = self.index.get("CAS", {}).get(cas, []) if len(rec_ids) == 1: return self._load_record(rec_ids[0], order=rec_ids[0],db=True) else: return [self._load_record(rid, order=rid,db=True) for rid in rec_ids]
def byEC(self, ec)
-
Expand source code
def byEC(self, ec): ec = ec[0] if isinstance(ec,list) else ec rec_ids = self.index.get("EC", {}).get(ec, []) return [self._load_record(rid, order=rid) for rid in rec_ids]
def byFCM(self, fcm)
-
Expand source code
def byFCM(self, fcm): fcm = fcm[0] if isinstance(fcm,list) else fcm fcm = str(fcm) if isinstance(fcm,int) else fcm rec_ids = self.index.get("FCM", {}).get(fcm, []) return [self._load_record(rid, order=rid) for rid in rec_ids]
def byRef(self, ref)
-
Expand source code
def byRef(self, ref): ref = ref[0] if isinstance(ref,list) else ref rec_ids = self.index.get("Ref", {}).get(ref, []) return [self._load_record(rid, order=rid) for rid in rec_ids]
def bySML(self, min_val, max_val)
-
Expand source code
def bySML(self, min_val, max_val): results = [] for rid in self.order: rec = self._load_record(rid) if rec is not None: sml = rec.get("SML", SMLdefault) if min_val <= sml <= max_val: results.append(rec) return results
def bySMLT(self, min_val, max_val)
-
Expand source code
def bySMLT(self, min_val, max_val): results = [] for rid in self.order: rec = self._load_record(rid) if rec is not None: smlt = rec.get("SMLT", SMLdefault) if min_val <= smlt <= max_val: results.append(rec) return results
def bycid(self, cid, verbose=True)
-
Search for a record by PubChem cid.
Expand source code
def bycid(self, cid, verbose=True): """ Search for a record by PubChem cid. """ cid = cid[0] if isinstance(cid,list) else cid cidkey = str(cid) if "bycid" in self.index and cidkey in self.index["bycid"]: rec_id = self.index["bycid"][str(cid)] return self._load_record(rec_id, order=rec_id,db=True) else: if verbose: printWARN(f"⚠️ Warning: No 🇪🇺 10/2011/EC record found for PubChem cid {cid}.") return None
def byname(self, name)
-
Expand source code
def byname(self, name): name = name[0] if isinstance(name,list) else name rec_ids = self.index.get("name", {}).get(name, []) return [self._load_record(rid, order=rid) for rid in rec_ids]
def refresh_index(self)
-
Rebuild the global index by reading the CSV file and regenerating each record as recXXXXX.annex1.json (with record number as primary key). The index includes foreign key mappings for "name", "EC", "CAS", "FCM", "Ref" and a bycid index for PubChem cid.
For substances with an empty CAS number, no PubChem lookup is performed. For substances with a nonempty CAS, if the lookup fails (raising ValueError) the CAS is recorded in a missing file ("missing.pubchem.annex1.json") so that future refreshes do not re-query PubChem.
(Rate limiting is assumed to be managed via the module’s global variable PubChem_lastQueryTime.)
Expand source code
def refresh_index(self): """ Rebuild the global index by reading the CSV file and regenerating each record as recXXXXX.annex1.json (with record number as primary key). The index includes foreign key mappings for "name", "EC", "CAS", "FCM", "Ref" and a bycid index for PubChem cid. For substances with an empty CAS number, no PubChem lookup is performed. For substances with a nonempty CAS, if the lookup fails (raising ValueError) the CAS is recorded in a missing file ("missing.pubchem.annex1.json") so that future refreshes do not re-query PubChem. (Rate limiting is assumed to be managed via the module’s global variable PubChem_lastQueryTime.) """ from patankar.loadpubchem import migrant # local import of migrant new_index = {} new_index["index_date"] = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") new_index["csv_file"] = os.path.basename(self.csv_file) new_index["order"] = [] for key in ["name", "EC", "CAS", "FCM", "Ref"]: new_index[key] = {} new_index["bycid"] = {} # Load missing CAS numbers (only for substances with a CAS) missing_file = os.path.join(self.cache_dir, "missing.pubchem.annex1.json") if os.path.exists(missing_file): with open(missing_file, "r", encoding="utf-8") as mf: missing_pubchem = json.load(mf) else: missing_pubchem = {} records = [] rec_num = 0 with open(self.csv_file, "r", encoding="utf-8") as f: reader = csv.reader(f, delimiter="\t", quotechar='"') header_found = False for row in reader: if not header_found: if row and row[0] == "Name": header_found = True continue if not row or len(row) < 19: continue rec_num += 1 rec = {} def yesno(val): val = val.strip().lower() if val == "yes": return True elif val == "no": return False return None # Use record number as primary key stored as "record" rec["record"] = rec_num rec["cid"] = None # default value # Traceability (stored but not shown in __repr__) rec["csvFile"] = os.path.basename(self.csv_file) rec["date"] = new_index["index_date"] rec["name"] = row[0].strip() rec["EC"] = row[1].strip() rec["CAS"] = row[2].strip() rec["name2"] = row[3].strip() rec["CAS2"] = row[4].strip() try: rec["FCM"] = int(row[5].strip()) if row[5].strip() not in ["", "-"] else None except: rec["FCM"] = None rec["Ref"] = row[6].strip() rec["Additive_or_PPA"] = yesno(row[7]) rec["Use_as_monomer_macromolecule"] = yesno(row[8]) rec["FRFapplicable"] = yesno(row[9]) def parse_sml(val): val = val.strip() if not val: return SMLdefault m = re.search(r"([\d\.]+)", val) return float(m.group(1)) if m else SMLdefault rec["SML"] = parse_sml(row[10]) rec["SMLunit"] = "mg/kg" rec["SMLTrestrictions"] = row[11].strip() val = row[12].strip() if val: try: rec["SMLTGroupFCMsubstances"] = [int(x.strip()) for x in val.split(",") if x.strip()] except: rec["SMLTGroupFCMsubstances"] = None else: rec["SMLTGroupFCMsubstances"] = None rec["SMLT"] = parse_sml(row[13]) if row[13].strip() == "": rec["SMLT"] = rec["SML"] rec["SMLTunit"] = "mg/kg" rec["Restrictions"] = row[14].strip() rec["NotesCompliance"] = row[15].strip() rec["Notes"] = row[16].strip() rec["Physicalform"] = row[17].strip() rec["ExpressedAs"] = row[18].strip() # Only attempt PubChem lookup if CAS is nonempty. cas_val = rec["CAS"] if cas_val and cas_val.strip(): if cas_val in missing_pubchem: cid_val = missing_pubchem[cas_val] else: try: cid_val = migrant(cas_val,annex1=False).cid except ValueError: printWARN(f"🇪🇺 Warning: substance {rec['name']} (CAS {cas_val}) not found in PubChem.") cid_val = None missing_pubchem[cas_val] = None else: # No CAS provided, so we know there will be no cid. cid_val = None rec["cid"] = cid_val # Order the record keys: first "record", then "cid", then others. ordered_rec = { "record": rec["record"], "cid": rec["cid"], "name": rec["name"], "CAS": rec["CAS"], "EC": rec["EC"], "FCM": rec["FCM"], "Ref": rec["Ref"], "Additive_or_PPA": rec["Additive_or_PPA"], "Use_as_monomer_macromolecule": rec["Use_as_monomer_macromolecule"], "FRFapplicable": rec["FRFapplicable"], "SML": rec["SML"], "SMLunit": rec["SMLunit"], "SMLTrestrictions": rec["SMLTrestrictions"], "SMLTGroupFCMsubstances": rec["SMLTGroupFCMsubstances"], "SMLT": rec["SMLT"], "SMLTunit": rec["SMLTunit"], "Restrictions": rec["Restrictions"], "NotesCompliance": rec["NotesCompliance"], "Notes": rec["Notes"], "Physicalform": rec["Physicalform"], "ExpressedAs": rec["ExpressedAs"], "engine": f"SFPPy: {os.path.basename(__file__)}", "csvFile": rec["csvFile"], "date": rec["date"] } rec_filename = f"rec{rec_num:05d}.annex1.json" json_filename = os.path.join(self.cache_dir, rec_filename) with open(json_filename, "w", encoding="utf-8") as jf: json.dump(ordered_rec, jf, ensure_ascii=False, indent=2) new_index["order"].append(rec_num) for key in ["name", "EC", "CAS", "FCM", "Ref"]: value = rec[key] if value not in new_index[key]: new_index[key][value] = [] new_index[key][value].append(rec_num) if cid_val is not None: new_index["bycid"][cid_val] = rec_num records.append(ordered_rec) with open(self.index_file, "w", encoding="utf-8") as f: json.dump(new_index, f, ensure_ascii=False, indent=2) with open(missing_file, "w", encoding="utf-8") as mf: json.dump(missing_pubchem, mf, ensure_ascii=False, indent=2) self.index = new_index self.order = new_index.get("order", []) self._records_cache = {}
class annex1record (d, order=None, total=None)
-
Represents a single substance record from Annex I of EU Regulation 10/2011 EC.
This class is a subclass of dict that stores the data for one authorized substance. It contains keys such as: - "record": the primary record number (1-based, as assigned from the CSV file), - "cid": the PubChem compound identifier (which may be None), - "name", "CAS", "EC", "FCM", "Ref", and additional fields such as migration limits (SML, SMLT), restrictions, and notes.
The class provides custom str and repr methods for formatted display. In repr, only nonempty fields are shown, the header line begins with "record: X of Y", and text wrapping is applied so that the first line wraps at a specified width and subsequent lines wrap at a set width (with proper indenting).
Initialize an annex1record from dictionary d. order: the record number (from the CSV; if known) total: total number of records (if known)
Expand source code
class annex1record(dict): """ Represents a single substance record from Annex I of EU Regulation 10/2011 EC. This class is a subclass of dict that stores the data for one authorized substance. It contains keys such as: - "record": the primary record number (1-based, as assigned from the CSV file), - "cid": the PubChem compound identifier (which may be None), - "name", "CAS", "EC", "FCM", "Ref", and additional fields such as migration limits (SML, SMLT), restrictions, and notes. The class provides custom __str__ and __repr__ methods for formatted display. In __repr__, only nonempty fields are shown, the header line begins with "record: X of Y", and text wrapping is applied so that the first line wraps at a specified width and subsequent lines wrap at a set width (with proper indenting). """ def __init__(self, d, order=None, total=None): """ Initialize an annex1record from dictionary d. order: the record number (from the CSV; if known) total: total number of records (if known) """ if not isinstance(d,dict): raise TypeError("dict must be a dict not a {type(d).__name__}") super().__init__(d) # Use the value stored in the record (key "record") if available. self._order = d.get("record", order) self._total = total def __str__(self): cid = self.get("cid", None) order_str = f"{self._order}" if self._order is not None else "?" total_str = f"{self._total}" if self._total is not None else "?" return f"<{self.__class__.__name__} with cid:{cid} - record {order_str} of {total_str} (Annex 1 of 10/2011/EC)>" def __repr__(self): lines = [] order_str = f"{self._order}" if self._order is not None else "?" total_str = f"{self._total}" if self._total is not None else "?" header = f" ---- [ 🇪🇺 10/2011/EC record: {order_str} of {total_str} ] ----" lines.append(header) # Define display order; note that we do not show SMLunit, SMLTunit, csvFile, or date. fields_order = [ "record", "cid", "name", "CAS", "EC", "FCM", "Ref", "Additive_or_PPA", "Use_as_monomer_macromolecule", "FRFapplicable", "SML", "SMLTrestrictions", "SMLTGroupFCMsubstances", "SMLT", "Restriction(s)", "NotesCompliance", "Notes", "Physicalform", "ExpressedAs" ] # For each field, if value is not empty then display it. for key in fields_order: if key not in self: continue val = self[key] if val is None or (isinstance(val, str) and not val.strip()): continue # For SML and SMLT, append the unit inline. if key == "SML": display_val = f"{val} [mg/kg]" elif key == "SMLT": display_val = f"{val} [mg/kg]" else: display_val = str(val) # Wrap the value to 60 characters; subsequent lines are indented by 25 spaces. wrapped_val = custom_wrap(display_val, width=60, indent=" " * 22) lines.append(f"{key:>20}: {wrapped_val}") # for the extended class if isinstance(self,annex1record): lines.append(f"\n{'--- extended':>20}: properties ---\n") attr_order = ["cid","M","SML","SMLT","n","gFCM","gM","gMmin","gCASmin","gnamemin"] for attr in attr_order: if hasattr(self, attr): val = getattr(self, attr) lines.append(f"{attr:>20}: {val}") print("\n".join(lines)) return str(self) @property def ispubchemok(self): """Returns True if the record may match in PubChem (i.e., compatible with patankar.loadpubchem)""" return self.get("cid") is not None and self.get("CAS") not in ("", None)
Ancestors
- builtins.dict
Subclasses
Instance variables
var ispubchemok
-
Returns True if the record may match in PubChem (i.e., compatible with patankar.loadpubchem)
Expand source code
@property def ispubchemok(self): """Returns True if the record may match in PubChem (i.e., compatible with patankar.loadpubchem)""" return self.get("cid") is not None and self.get("CAS") not in ("", None)
class annex1record_ext (rec, db=None)
-
extended annex1record class with additional attributes
instantiate from a record with a consolidated databse: db
Expand source code
class annex1record_ext(annex1record): """extended annex1record class with additional attributes""" def __init__(self,rec,db=None): """instantiate from a record with a consolidated databse: db""" if not isinstance(rec,annex1record): raise TypeError("dict must be a annex1record not a {type(d).__name__}") super().__init__(rec,order=rec._order,total=rec._total) from patankar.loadpubchem import migrant if rec.ispubchemok: try: m = migrant(rec.get("CAS"),annex1=False) M = m.M except Exception: print(f"{rec.get('name')} could not be extended from its CAS {rec.get('CAS')}: not found") M = None else: M = None self.cid = self.get("cid") self.SML = self.get("SML") self.SMLT = self.get("SMLT") if self.SMLT and self.SMLT<self.SML: self.SML = self.SMLT self.gFCM = self.get("SMLTGroupFCMsubstances") self.n = len(self.gFCM) if self.gFCM else None self.M = M # molecular mass added if cid was available self.gM = None # molecular masses of sustances in the group self.gMmin = None # minimal molecular mass of substances in the group self.gCASmin = None # CAS of the lightest/smallest substances self.gnamemin = None # its name # we will add group info gM, gMin, gCASmin, gnamemin # do not forget, no bijection between rid (record id) and fcm # here fcm2rid[self.gFCM[0]], fcm2rid[self.gFCM[1]],... # are all equal, they give rids (record indices) for the group matching this FCM if db is not None and self.gFCM is not None: fcm2rid = db._fcm2rid # gFCM keys should be string but they can be cached as int if isinstance(self.gFCM[0],str): rids = fcm2rid[self.gFCM[0]] else: rids = fcm2rid[str(self.gFCM[0])] Mlist = [db._load_record(rid,db=False).M for rid in rids] CASlist = [db._load_record(rid,db=False).get("CAS") for rid in rids] namelist = [db._load_record(rid,db=False).get("name") for rid in rids] # Find index of the smallest non-None molecular mass valid_indices = [i for i, m in enumerate(Mlist) if m is not None] # Indices of valid values if valid_indices: min_index = min(valid_indices, key=lambda i: Mlist[i]) # Index of minimum valid M value self.gM = [m for m in Mlist if m is not None] self.gMmin = Mlist[min_index] self.gCASmin = CASlist[min_index] self.gnamemin = namelist[min_index]
Ancestors
- annex1record
- builtins.dict
Inherited members