Module USFDAfcn
Expand source code
__all__ = ['USFDAfcn', 'clean_html', 'custom_wrap', 'fcnrecord', 'fcnrecord_ext', 'parse_food_contact_substance', 'printWARN']
"""
Module: USFDAfcn
This module implements a robust database manager for the US Inventory of Effective Food Contact
Substance (FCS) notifications as provided by the US FDA. The CSV file (exported from the US FDA website)
contains the official Food Contact Substance Notification list but with information not well organized.
Each row in the CSV file is converted into a JSON record (named as recXXXXX.fcn.json in the cache directory)
with the following ordered fields:
- "record": the sequential record number (1-based)
- "cid": the PubChem compound identifier (or list of cids if a mixture)
- "name": the chemical name (a string for single substances or a list for mixtures)
- "CAS": the CAS number (a string or list if a mixture)
- "FCNNo": the FCN number extracted from the first column (e.g. =T("2355") becomes "2355")
- "FoodContactSubstance": the full original field text from the CSV
- "mixture": boolean (True if the substance is a mixture, i.e. compound and CAS are lists)
- "FCSreplacedby": field as provided (if the notification has been replaced)
- "FCSreplacedby_record": the record number corresponding to the replacement (if available)
- "notifier": the notifier field from the CSV
- "manufacturer": from the CSV field "Manufacture/Supplier"
- "NotificationDate": the notification date from the CSV
At the end, traceability fields "engine", "csfile", and "date" are added.
The module also builds a global index (stored in fcn_index.json) mapping key fields:
- "name", "CAS", "FCNNo", "FCSreplacedby" and PubChem cid ("bycid")
to the corresponding record numbers.
Key features include:
- Robust parsing of the FoodContactSubstance field using regular expressions.
- Handling of mixtures (multiple chemicals in one notification) where the chemical names and CAS numbers
are stored as lists.
- PubChem lookup for each CAS number (implemented via patankar.loadpubchem.migrant) to retrieve the corresponding cid.
- A secondary pass to resolve FCSreplacedby: if a record indicates that it has been replaced by another notification,
a new field "FCSreplacedby_record" is added corresponding to the record number of the replacement notification.
- Lookup methods by record number, by CAS, by FCNNo, and by PubChem cid.
- Caching of missing PubChem results in "missing.pubchem.fcn.json" to avoid repeated failed queries.
@version: 1.41
@project: SFPPy - SafeFoodPackaging Portal in Python initiative
@author: INRAE\\olivier.vitrac@agroparistech.fr
@licence: MIT
@Date: 2024-01-10
@rev: 2025-03-31
"""
import os, csv, json, datetime, time, re, textwrap
__project__ = "SFPPy"
__author__ = "Olivier Vitrac"
__copyright__ = "Copyright 2022"
__credits__ = ["Olivier Vitrac"]
__license__ = "MIT"
__maintainer__ = "Olivier Vitrac"
__email__ = "olivier.vitrac@agroparistech.fr"
__version__ = "1.41"
# Default PubChem lookup error value (if not found, value remains None)
# (This can be adapted if needed)
DEFAULT_PUBCHEM = None
# Module-level variables to track last warning message and its timestamp
_LAST_WARN_ = None
_T_LAST_WARN_ = 0.0
# ----------------------------------------------------------------------
# Utility function: custom_wrap (for pretty printing)
# ----------------------------------------------------------------------
def custom_wrap(text, width=60, indent=" " * 22):
# Wrap first line to specified width.
first_line = textwrap.wrap(text, width=width)
if not first_line:
return ""
first = first_line[0]
remaining = text[len(first):].lstrip()
subsequent_lines = textwrap.wrap(remaining, width=width)
wrapped = [first] + [indent + line for line in subsequent_lines]
return "\n".join(wrapped)
# ----------------------------------------------------------------------
# Show warnings without repeating them
# ----------------------------------------------------------------------
def printWARN(message: str, tsilent: float = 10.0):
"""
Print a warning message only if:
- it's different from the last one, or
- more than `tsilent` seconds have passed since the last identical warning.
Parameters:
----------
message : str
The warning message to display.
tsilent : float, optional
Minimum time (in seconds) between repeated identical warnings.
"""
global _LAST_WARN_, _T_LAST_WARN_
tnow = time.time()
if message != _LAST_WARN_ or (tnow - _T_LAST_WARN_ > tsilent):
print(message)
_LAST_WARN_ = message
_T_LAST_WARN_ = tnow
# ----------------------------------------------------------------------
# *New helper function to remove HTML tags*
# ----------------------------------------------------------------------
def clean_html(text):
return re.sub(r'<[^>]*>', '', text).strip()
# ----------------------------------------------------------------------
# Helper function to parse FoodContactSubstance field
# ----------------------------------------------------------------------
def parse_food_contact_substance(text):
"""
Parses the full FoodContactSubstance field to extract the chemical name(s) and CAS number(s).
For a single chemical, the name is defined as the portion of the text from the start until the first
occurrence of "(CAS Reg. No." (after removing a trailing phrase such as " (produced by ...").
For mixtures (multiple occurrences), a fallback using re.findall is used.
Returns:
name: a string if one match is found, or a list of names for mixtures.
CAS: a string if one match is found, or a list of CAS numbers for mixtures.
mixture: Boolean, True if more than one match is found.
"""
text = text.strip()
# If there is no CAS pattern, return the full text.
if "(CAS Reg. No." not in text:
return text, "", False
count = text.count("(CAS Reg. No.")
if count == 1:
idx = text.find("(CAS Reg. No.")
candidate = text[:idx].strip()
# If the candidate contains a trailing phrase like " (produced by", remove it.
p_idx = candidate.lower().rfind(" (produced by")
if p_idx != -1:
candidate = candidate[:p_idx].strip()
candidate = candidate.rstrip(",;").strip()
m = re.search(r'\(CAS Reg\. No\. ([\d-]+)\)', text[idx:])
cas = m.group(1) if m else ""
return candidate, cas.strip(), False
else:
# For mixtures, use re.findall to capture all occurrences.
# This pattern attempts to capture a chemical name (without leading commas/semicolons)
# and its corresponding CAS number.
pattern = r'([^,(;]+(?:\([^)]*\))?[^,(;]*)\s*\(CAS Reg\. No\. ([\d-]+)\)'
matches = re.findall(pattern, text)
if not matches:
# Fallback: treat as single if no matches are found.
idx = text.find("(CAS Reg. No.")
candidate = text[:idx].strip().rstrip(",;")
m = re.search(r'\(CAS Reg\. No\. ([\d-]+)\)', text[idx:])
cas = m.group(1) if m else ""
return candidate, cas.strip(), False
names = []
cas_list = []
for nm, cas in matches:
nm_clean = nm.strip().lstrip(",;").strip()
names.append(nm_clean)
cas_list.append(cas.strip())
if len(names) == 1:
return names[0], cas_list[0], False
return names, cas_list, True
# ----------------------------------------------------------------------
# Class: fcnrecord
# ----------------------------------------------------------------------
class fcnrecord(dict):
"""
Represents a single Food Contact Substance Notification record from the US FDA database.
Keys include:
- "record": the sequential record number (1-based)
- "cid": PubChem compound identifier (or list of cids for mixtures; may be None)
- "name": interpreted chemical name (string or list for mixtures)
- "CAS": CAS number (string or list for mixtures)
- "FCNNo": the FCN number extracted from the CSV row (e.g. from =T("2355"))
- "FoodContactSubstance": the full field text from the CSV
- "mixture": boolean flag indicating if the record represents a mixture
- "FCSreplacedby": original field (as provided) indicating a replacement notification
- "FCSreplacedby_record": record number (if found) corresponding to the replacement notification
- "notifier", "manufacturer", "NotificationDate": additional fields from the CSV
- Traceability fields: "engine", "csfile", "date"
"""
def __init__(self, d, order=None, total=None):
if not isinstance(d, dict):
raise TypeError("Input must be a dict, not a {}".format(type(d).__name__))
super().__init__(d)
self._order = d.get("record", order)
self._total = total
def __str__(self):
cid = self.get("cid", None)
order_str = f"{self._order}" if self._order is not None else "?"
total_str = f"{self._total}" if self._total is not None else "?"
return f"<{self.__class__.__name__} with cid:{cid} - record {order_str} of {total_str} (US FDA FCS)>"
def __repr__(self):
lines = []
order_str = f"{self._order}" if self._order is not None else "?"
total_str = f"{self._total}" if self._total is not None else "?"
header = f" ---- [ US FDA FCS record: {order_str} of {total_str} ] ----"
lines.append(header)
fields_order = [
"record", "cid", "name", "CAS", "FCNNo",
"FoodContactSubstance", "mixture", "FCSreplacedby", "FCSreplacedby_record",
"notifier", "manufacturer", "NotificationDate"
]
for key in fields_order:
if key not in self:
continue
val = self[key]
if val is None or (isinstance(val, str) and not val.strip()):
continue
wrapped_val = custom_wrap(str(val), width=60, indent=" " * 22)
lines.append(f"{key:>20}: {wrapped_val}")
for key in ["engine", "csfile", "date"]:
if key in self:
wrapped_val = custom_wrap(str(self[key]), width=60, indent=" " * 22)
lines.append(f"{key:>20}: {wrapped_val}")
return "\n".join(lines)
@property
def ispubchemok(self):
cas = self.get("CAS")
if self.get("mixture", False):
return bool(cas and any(c.strip() for c in cas))
return cas not in ("", None)
# ----------------------------------------------------------------------
# Class: fcnrecord_ext
# ----------------------------------------------------------------------
class fcnrecord_ext(fcnrecord):
"""
Extended fcnrecord that automatically retrieves additional chemical information from PubChem.
For each CAS number (or each CAS in a mixture) the PubChem lookup is performed.
The field "cid" is updated to be either a single PubChem CID or a list of CIDs.
"""
def __init__(self, rec, db=None, verbosity=False):
"""
Instantiate from a base fcnrecord.
If a valid CAS is available, perform PubChem lookup via the 'migrant' function.
"""
if not isinstance(rec, fcnrecord):
raise TypeError("Input must be an fcnrecord, not a {}".format(type(rec).__name__))
super().__init__(rec, order=rec._order, total=rec._total)
from patankar.loadpubchem import migrant
if self.ispubchemok:
if self.get("mixture", False):
cids = []
for cas in self.get("CAS", []):
try:
m = migrant(cas, annex1=False)
cids.append(m.cid)
except Exception:
if verbosity:
printWARN(f"🇺🇸 Warning: PubChem lookup failed for CAS {cas} in record {self.get('record')}")
cids.append(DEFAULT_PUBCHEM)
self.cid = cids
else:
cas = self.get("CAS")
try:
m = migrant(cas, annex1=False)
self.cid = m.cid
except Exception:
if verbosity:
printWARN(f"🇺🇸 Warning: PubChem lookup failed for CAS {cas} in record {self.get('record')}")
self.cid = DEFAULT_PUBCHEM
else:
self.cid = None
# ----------------------------------------------------------------------
# Class: USFDAfcn
# ----------------------------------------------------------------------
class USFDAfcn:
"""
Manages the US FDA Food Contact Substance Notification CSV file and caches its data for efficient lookup.
This class reads the official CSV file and processes each row into a JSON record (recXXXXX.fcn.json)
stored in a cache directory (default "cache.USFDAfcn"). It then builds a global index (stored as fcn_index.json)
mapping key fields—such as "name", "CAS", "FCNNo", "FCSreplacedby" and PubChem cid—to their corresponding record numbers.
Lookup methods include:
- __getitem__: lookup by sequential record number or by CAS string.
- __call__: callable access by record number, PubChem cid, or CAS.
- byname, byCAS, byFCNNo, byFCSreplacedby, bycid: dedicated search methods.
Global Rate Limiting:
PubChem lookups are assumed to be managed by a module-level variable in patankar.loadpubchem.
"""
isInitialized = False
def __init__(self, cache_dir="cache.USFDAfcn", index_file="fcn_index.json", pubchem=True):
self.base_dir = os.path.dirname(__file__)
self.csv_file = os.path.join(self.base_dir, "FCN.csv")
if not os.path.exists(self.csv_file):
raise FileNotFoundError(f"CSV file {self.csv_file} not found.")
self.cache_dir = os.path.join(self.base_dir, cache_dir)
if not os.path.exists(self.cache_dir):
os.makedirs(self.cache_dir)
self.index_file = os.path.join(self.cache_dir, index_file)
if os.path.exists(self.index_file):
with open(self.index_file, "r", encoding="utf-8") as f:
self.index = json.load(f)
else:
self.refresh_index()
self.order = self.index.get("order", [])
self._records_cache = {}
self._pubchem = pubchem
USFDAfcn.isInitialized = True
@classmethod
def isindexinitialized(cls, cache_dir="cache.USFDAfcn", index_file="fcn_index.json"):
return os.path.exists(os.path.join(os.path.dirname(__file__), cache_dir, index_file))
def refresh_index(self):
from patankar.loadpubchem import migrant
new_index = {}
index_date = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
new_index["index_date"] = index_date
new_index["csv_file"] = os.path.basename(self.csv_file)
new_index["order"] = []
for key in ["name", "CAS", "FCNNo", "FCSreplacedby"]:
new_index[key] = {}
new_index["bycid"] = {}
missing_file = os.path.join(self.cache_dir, "missing.pubchem.fcn.json")
if os.path.exists(missing_file):
with open(missing_file, "r", encoding="utf-8") as mf:
missing_pubchem = json.load(mf)
else:
missing_pubchem = {}
records_list = []
fcnno2recid = {}
rec_num = 0
with open(self.csv_file, "r", encoding="latin1") as f:
while True:
pos = f.tell()
line = f.readline()
if not line:
break
if line.lstrip().startswith("FCN No"):
f.seek(pos)
break
reader = csv.reader(f, delimiter=",")
header = next(reader, None)
for row in reader:
if not row or len(row) < 7:
continue
rec_num += 1
rec = {}
fcn_no_match = re.search(r'=T\("(\d+)"\)', row[0])
if fcn_no_match:
fcn_no = fcn_no_match.group(1)
else:
fcn_no = row[0].strip()
rec["FCNNo"] = fcn_no
# Parse Food Contact Substance from column 6
fcs_text = row[6].strip()
rec["FoodContactSubstance"] = fcs_text
name_val, cas_val, is_mixture = parse_food_contact_substance(fcs_text)
rec["name"] = name_val
rec["CAS"] = cas_val
rec["mixture"] = is_mixture
# FCS REPLACED BY from column 7
rec["FCSreplacedby"] = row[7].strip() if len(row) > 7 else ""
# Notifier from column 8
rec["notifier"] = row[8].strip() if len(row) > 8 else ""
# *Clean manufacturer field by removing HTML tags (from column 9)*
rec["manufacturer"] = clean_html(row[9].strip()) if len(row) > 9 else ""
# Notification Date from column 12 (Effective Date)
rec["NotificationDate"] = row[12].strip() if len(row) > 12 else ""
rec["csvFile"] = os.path.basename(self.csv_file)
rec["date"] = index_date
rec["engine"] = f"SFPPy: {os.path.basename(__file__)}"
rec["record"] = rec_num
rec["cid"] = None
if rec["CAS"] and not rec.get("mixture", False):
cas = rec["CAS"]
if cas in missing_pubchem:
cid_val = missing_pubchem[cas]
else:
try:
cid_val = migrant(cas, annex1=False).cid
except Exception:
printWARN(f"🇺🇸 Warning: PubChem lookup failed for {rec['name']} (CAS {cas}).")
cid_val = None
missing_pubchem[cas] = None
rec["cid"] = cid_val
elif rec["CAS"] and rec.get("mixture", False):
cid_list = []
for cas in rec["CAS"]:
if cas in missing_pubchem:
cid_val = missing_pubchem[cas]
else:
try:
cid_val = migrant(cas, annex1=False).cid
except Exception:
printWARN(f"🇺🇸 Warning: PubChem lookup failed for component with CAS {cas} in record {rec_num}.")
cid_val = None
missing_pubchem[cas] = None
cid_list.append(cid_val)
rec["cid"] = cid_list
ordered_rec = {
"record": rec["record"],
"cid": rec["cid"],
"name": rec["name"],
"CAS": rec["CAS"],
"FCNNo": rec["FCNNo"],
"FoodContactSubstance": rec["FoodContactSubstance"],
"mixture": rec["mixture"],
"FCSreplacedby": rec["FCSreplacedby"],
"FCSreplacedby_record": None,
"notifier": rec["notifier"],
"manufacturer": rec["manufacturer"],
"NotificationDate": rec["NotificationDate"],
"engine": rec["engine"],
"csfile": rec["csvFile"],
"date": rec["date"]
}
rec_filename = f"rec{rec_num:05d}.fcn.json"
json_filename = os.path.join(self.cache_dir, rec_filename)
with open(json_filename, "w", encoding="utf-8") as jf:
json.dump(ordered_rec, jf, ensure_ascii=False, indent=2)
new_index["order"].append(rec_num)
if rec.get("mixture", False):
for nm in rec["name"]:
new_index["name"].setdefault(nm, []).append(rec_num)
else:
new_index["name"].setdefault(rec["name"], []).append(rec_num)
if rec.get("mixture", False):
for cas in rec["CAS"]:
new_index["CAS"].setdefault(cas, []).append(rec_num)
else:
new_index["CAS"].setdefault(rec["CAS"], []).append(rec_num)
new_index["FCNNo"].setdefault(rec["FCNNo"], []).append(rec_num)
if rec["FCSreplacedby"]:
new_index["FCSreplacedby"].setdefault(rec["FCSreplacedby"], []).append(rec_num)
if rec["cid"] is not None:
if rec.get("mixture", False):
for cid in rec["cid"]:
if cid is not None:
new_index["bycid"][str(cid)] = rec_num
else:
new_index["bycid"][str(rec["cid"])] = rec_num
fcnno2recid[rec["FCNNo"]] = rec_num
records_list.append((rec_num, ordered_rec))
# Second pass: resolve FCSreplacedby_record using FCNNo mapping
for rec_id, rec in records_list:
fcsrep_field = rec.get("FCSreplacedby", "").strip()
if fcsrep_field:
# *Extract FCN number from FCSreplacedby field if present*
fcsrep_match = re.search(r'FCN\s*(\d+)', fcsrep_field, re.IGNORECASE)
if fcsrep_match:
fcsrep = fcsrep_match.group(1)
else:
fcsrep = fcsrep_field
rep_recid = fcnno2recid.get(fcsrep)
rec["FCSreplacedby_record"] = rep_recid
json_filename = os.path.join(self.cache_dir, f"rec{rec_id:05d}.fcn.json")
try:
with open(json_filename, "w", encoding="utf-8") as jf:
json.dump(rec, jf, ensure_ascii=False, indent=2)
except Exception as e:
printWARN(f"🇺🇸 Warning: Could not update FCSreplacedby_record in {json_filename}: {e}")
with open(self.index_file, "w", encoding="utf-8") as f_index:
json.dump(new_index, f_index, ensure_ascii=False, indent=2)
with open(missing_file, "w", encoding="utf-8") as mf:
json.dump(missing_pubchem, mf, ensure_ascii=False, indent=2)
self.index = new_index
self.order = new_index.get("order", [])
self._records_cache = {}
def _load_record(self, rec_id, order=None, db=False):
if rec_id in self._records_cache:
if self._pubchem:
if db:
return fcnrecord_ext(self._records_cache[rec_id], self)
else:
return fcnrecord_ext(self._records_cache[rec_id])
else:
return self._records_cache[rec_id]
json_filename = os.path.join(self.cache_dir, f"rec{rec_id:05d}.fcn.json")
if not os.path.exists(json_filename):
printWARN(f"🇺🇸 Warning: Record file for record {rec_id} not found.")
return None
with open(json_filename, "r", encoding="utf-8") as jf:
rec = json.load(jf)
record_obj = fcnrecord(rec, order=rec.get("record"), total=len(self.order))
self._records_cache[rec_id] = record_obj
if self._pubchem:
return fcnrecord_ext(record_obj, self)
else:
return record_obj
def __getitem__(self, key):
if isinstance(key, slice):
start = key.start if key.start is not None else min(self.order)
stop = key.stop if key.stop is not None else max(self.order) + 1
rec_ids = [rid for rid in self.order if start <= rid < stop]
if not rec_ids:
raise KeyError(f"No records found in range {start} to {stop - 1}. Valid records range from {min(self.order)} to {max(self.order)}.")
return [self._load_record(rid, order=rid) for rid in rec_ids]
elif isinstance(key, int):
if key in self.order:
return self._load_record(key, order=key)
else:
raise KeyError(f"Record number {key} not found. Valid records range from {min(self.order)} to {max(self.order)}.")
elif isinstance(key, (list, tuple)):
return [self.__getitem__(k) for k in key]
elif isinstance(key, str):
# First, try CAS lookup.
if key in self.index.get("CAS", {}):
rec_ids = self.index["CAS"][key]
if len(rec_ids) == 1:
return self._load_record(rec_ids[0], order=rec_ids[0])
else:
return [self._load_record(rid, order=rid) for rid in rec_ids]
# Then, if key is all digits, try FCNNo lookup.
elif key.isdigit() and key in self.index.get("FCNNo", {}):
rec_ids = self.index["FCNNo"][key]
if len(rec_ids) == 1:
return self._load_record(rec_ids[0], order=rec_ids[0])
else:
return [self._load_record(rid, order=rid) for rid in rec_ids]
else:
available = list(self.index.get("CAS", {}).keys()) + list(self.index.get("FCNNo", {}).keys())
sample = ", ".join(available[:10]) + (" ..." if len(available) > 10 else "")
raise KeyError(f"Key '{key}' not found in index. Valid keys include: {sample}")
else:
raise KeyError(f"Unsupported key type: {type(key)}")
def __call__(self, *args):
if len(args) == 1 and isinstance(args[0], (list, tuple)):
args = args[0]
results = []
for arg in args:
if isinstance(arg, int):
argkey = str(arg)
if arg in self.order:
results.append(self._load_record(arg))
elif "bycid" in self.index and argkey in self.index["bycid"]:
rec_id = self.index["bycid"][argkey]
results.append(self._load_record(rec_id))
else:
printWARN(f"🇺🇸 Warning: Record for identifier {arg} not found.")
results.append(None)
elif isinstance(arg, str):
result_item = self.__getitem__(arg)
if isinstance(result_item, list):
results.extend(result_item)
else:
results.append(result_item)
else:
raise KeyError(f"Unsupported key type in call: {type(arg)}")
return results[0] if len(results) == 1 else results
def byname(self, name):
name = name[0] if isinstance(name, list) else name
rec_ids = self.index.get("name", {}).get(name, [])
return [self._load_record(rid, order=rid) for rid in rec_ids]
def byCAS(self, cas):
cas = cas[0] if isinstance(cas, list) else cas
rec_ids = self.index.get("CAS", {}).get(cas, [])
if len(rec_ids) == 1:
return self._load_record(rec_ids[0], order=rec_ids[0])
else:
return [self._load_record(rid, order=rid) for rid in rec_ids]
def byFCNNo(self, fcn_no):
fcn_no = fcn_no[0] if isinstance(fcn_no, list) else fcn_no
rec_ids = self.index.get("FCNNo", {}).get(fcn_no, [])
if len(rec_ids) == 1:
return self._load_record(rec_ids[0], order=rec_ids[0])
else:
return [self._load_record(rid, order=rid) for rid in rec_ids]
def byFCSreplacedby(self, fcsrep):
fcsrep = fcsrep[0] if isinstance(fcsrep, list) else fcsrep
rec_ids = self.index.get("FCSreplacedby", {}).get(fcsrep, [])
if len(rec_ids) == 1:
return self._load_record(rec_ids[0], order=rec_ids[0])
else:
return [self._load_record(rid, order=rid) for rid in rec_ids]
def bycid(self, cid, verbose=True):
cid = cid[0] if isinstance(cid, list) else cid
cidkey = str(cid)
if "bycid" in self.index and cidkey in self.index["bycid"]:
rec_id = self.index["bycid"][cidkey]
return self._load_record(rec_id, order=rec_id)
else:
if verbose:
printWARN(f"⚠️ Warning: No 🇺🇸 US FDA FCS record found for PubChem cid {cid}.")
return None
def __iter__(self):
for rid in self.order:
yield self._load_record(rid, order=rid)
def __len__(self):
return len(self.order)
def __contains__(self, item):
if isinstance(item, list):
item = item[0]
if isinstance(item, int):
return item in self.order or (("bycid" in self.index) and (str(item) in self.index["bycid"]))
if isinstance(item, str):
return item in self.index.get("CAS", {}) or item in self.index.get("FCNNo", {})
return False
def __repr__(self):
csv_filename = os.path.basename(self.csv_file)
index_date = self.index.get("index_date", "unknown")
print(f"🇺🇸US FDA FCS database ({len(self.order)} records)")
print(f"Imported from CSV {csv_filename} and indexed on {index_date}")
return str(self)
def __str__(self):
return f"<{self.__class__.__name__}: {len(self.order)} records (US FDA FCS)>"
# ----------------------------------------------------------------------
# Standalone test / debugging section
# ----------------------------------------------------------------------
if __name__ == "__main__":
# For debugging or standalone tests, one can initialize the database:
db = USFDAfcn()
print(db)
# Example lookup:
try:
rec = db[1]
print(rec)
except Exception as e:
print(f"Error retrieving record 1: {e}")
Functions
def clean_html(text)
-
Expand source code
def clean_html(text): return re.sub(r'<[^>]*>', '', text).strip()
def custom_wrap(text, width=60, indent=' ')
-
Expand source code
def custom_wrap(text, width=60, indent=" " * 22): # Wrap first line to specified width. first_line = textwrap.wrap(text, width=width) if not first_line: return "" first = first_line[0] remaining = text[len(first):].lstrip() subsequent_lines = textwrap.wrap(remaining, width=width) wrapped = [first] + [indent + line for line in subsequent_lines] return "\n".join(wrapped)
def parse_food_contact_substance(text)
-
Parses the full FoodContactSubstance field to extract the chemical name(s) and CAS number(s).
For a single chemical, the name is defined as the portion of the text from the start until the first occurrence of "(CAS Reg. No." (after removing a trailing phrase such as " (produced by …").
For mixtures (multiple occurrences), a fallback using re.findall is used.
Returns
name
- a string if one match is found, or a list of names for mixtures.
CAS
- a string if one match is found, or a list of CAS numbers for mixtures.
mixture
- Boolean, True if more than one match is found.
Expand source code
def parse_food_contact_substance(text): """ Parses the full FoodContactSubstance field to extract the chemical name(s) and CAS number(s). For a single chemical, the name is defined as the portion of the text from the start until the first occurrence of "(CAS Reg. No." (after removing a trailing phrase such as " (produced by ..."). For mixtures (multiple occurrences), a fallback using re.findall is used. Returns: name: a string if one match is found, or a list of names for mixtures. CAS: a string if one match is found, or a list of CAS numbers for mixtures. mixture: Boolean, True if more than one match is found. """ text = text.strip() # If there is no CAS pattern, return the full text. if "(CAS Reg. No." not in text: return text, "", False count = text.count("(CAS Reg. No.") if count == 1: idx = text.find("(CAS Reg. No.") candidate = text[:idx].strip() # If the candidate contains a trailing phrase like " (produced by", remove it. p_idx = candidate.lower().rfind(" (produced by") if p_idx != -1: candidate = candidate[:p_idx].strip() candidate = candidate.rstrip(",;").strip() m = re.search(r'\(CAS Reg\. No\. ([\d-]+)\)', text[idx:]) cas = m.group(1) if m else "" return candidate, cas.strip(), False else: # For mixtures, use re.findall to capture all occurrences. # This pattern attempts to capture a chemical name (without leading commas/semicolons) # and its corresponding CAS number. pattern = r'([^,(;]+(?:\([^)]*\))?[^,(;]*)\s*\(CAS Reg\. No\. ([\d-]+)\)' matches = re.findall(pattern, text) if not matches: # Fallback: treat as single if no matches are found. idx = text.find("(CAS Reg. No.") candidate = text[:idx].strip().rstrip(",;") m = re.search(r'\(CAS Reg\. No\. ([\d-]+)\)', text[idx:]) cas = m.group(1) if m else "" return candidate, cas.strip(), False names = [] cas_list = [] for nm, cas in matches: nm_clean = nm.strip().lstrip(",;").strip() names.append(nm_clean) cas_list.append(cas.strip()) if len(names) == 1: return names[0], cas_list[0], False return names, cas_list, True
def printWARN(message: str, tsilent: float = 10.0)
-
Print a warning message only if: - it's different from the last one, or - more than
tsilent
seconds have passed since the last identical warning.Parameters:
message : str The warning message to display. tsilent : float, optional Minimum time (in seconds) between repeated identical warnings.
Expand source code
def printWARN(message: str, tsilent: float = 10.0): """ Print a warning message only if: - it's different from the last one, or - more than `tsilent` seconds have passed since the last identical warning. Parameters: ---------- message : str The warning message to display. tsilent : float, optional Minimum time (in seconds) between repeated identical warnings. """ global _LAST_WARN_, _T_LAST_WARN_ tnow = time.time() if message != _LAST_WARN_ or (tnow - _T_LAST_WARN_ > tsilent): print(message) _LAST_WARN_ = message _T_LAST_WARN_ = tnow
Classes
class USFDAfcn (cache_dir='cache.USFDAfcn', index_file='fcn_index.json', pubchem=True)
-
Manages the US FDA Food Contact Substance Notification CSV file and caches its data for efficient lookup.
This class reads the official CSV file and processes each row into a JSON record (recXXXXX.fcn.json) stored in a cache directory (default "cache.USFDAfcn"). It then builds a global index (stored as fcn_index.json) mapping key fields—such as "name", "CAS", "FCNNo", "FCSreplacedby" and PubChem cid—to their corresponding record numbers.
Lookup methods include: - getitem: lookup by sequential record number or by CAS string. - call: callable access by record number, PubChem cid, or CAS. - byname, byCAS, byFCNNo, byFCSreplacedby, bycid: dedicated search methods.
Global Rate Limiting: PubChem lookups are assumed to be managed by a module-level variable in patankar.loadpubchem.
Expand source code
class USFDAfcn: """ Manages the US FDA Food Contact Substance Notification CSV file and caches its data for efficient lookup. This class reads the official CSV file and processes each row into a JSON record (recXXXXX.fcn.json) stored in a cache directory (default "cache.USFDAfcn"). It then builds a global index (stored as fcn_index.json) mapping key fields—such as "name", "CAS", "FCNNo", "FCSreplacedby" and PubChem cid—to their corresponding record numbers. Lookup methods include: - __getitem__: lookup by sequential record number or by CAS string. - __call__: callable access by record number, PubChem cid, or CAS. - byname, byCAS, byFCNNo, byFCSreplacedby, bycid: dedicated search methods. Global Rate Limiting: PubChem lookups are assumed to be managed by a module-level variable in patankar.loadpubchem. """ isInitialized = False def __init__(self, cache_dir="cache.USFDAfcn", index_file="fcn_index.json", pubchem=True): self.base_dir = os.path.dirname(__file__) self.csv_file = os.path.join(self.base_dir, "FCN.csv") if not os.path.exists(self.csv_file): raise FileNotFoundError(f"CSV file {self.csv_file} not found.") self.cache_dir = os.path.join(self.base_dir, cache_dir) if not os.path.exists(self.cache_dir): os.makedirs(self.cache_dir) self.index_file = os.path.join(self.cache_dir, index_file) if os.path.exists(self.index_file): with open(self.index_file, "r", encoding="utf-8") as f: self.index = json.load(f) else: self.refresh_index() self.order = self.index.get("order", []) self._records_cache = {} self._pubchem = pubchem USFDAfcn.isInitialized = True @classmethod def isindexinitialized(cls, cache_dir="cache.USFDAfcn", index_file="fcn_index.json"): return os.path.exists(os.path.join(os.path.dirname(__file__), cache_dir, index_file)) def refresh_index(self): from patankar.loadpubchem import migrant new_index = {} index_date = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") new_index["index_date"] = index_date new_index["csv_file"] = os.path.basename(self.csv_file) new_index["order"] = [] for key in ["name", "CAS", "FCNNo", "FCSreplacedby"]: new_index[key] = {} new_index["bycid"] = {} missing_file = os.path.join(self.cache_dir, "missing.pubchem.fcn.json") if os.path.exists(missing_file): with open(missing_file, "r", encoding="utf-8") as mf: missing_pubchem = json.load(mf) else: missing_pubchem = {} records_list = [] fcnno2recid = {} rec_num = 0 with open(self.csv_file, "r", encoding="latin1") as f: while True: pos = f.tell() line = f.readline() if not line: break if line.lstrip().startswith("FCN No"): f.seek(pos) break reader = csv.reader(f, delimiter=",") header = next(reader, None) for row in reader: if not row or len(row) < 7: continue rec_num += 1 rec = {} fcn_no_match = re.search(r'=T\("(\d+)"\)', row[0]) if fcn_no_match: fcn_no = fcn_no_match.group(1) else: fcn_no = row[0].strip() rec["FCNNo"] = fcn_no # Parse Food Contact Substance from column 6 fcs_text = row[6].strip() rec["FoodContactSubstance"] = fcs_text name_val, cas_val, is_mixture = parse_food_contact_substance(fcs_text) rec["name"] = name_val rec["CAS"] = cas_val rec["mixture"] = is_mixture # FCS REPLACED BY from column 7 rec["FCSreplacedby"] = row[7].strip() if len(row) > 7 else "" # Notifier from column 8 rec["notifier"] = row[8].strip() if len(row) > 8 else "" # *Clean manufacturer field by removing HTML tags (from column 9)* rec["manufacturer"] = clean_html(row[9].strip()) if len(row) > 9 else "" # Notification Date from column 12 (Effective Date) rec["NotificationDate"] = row[12].strip() if len(row) > 12 else "" rec["csvFile"] = os.path.basename(self.csv_file) rec["date"] = index_date rec["engine"] = f"SFPPy: {os.path.basename(__file__)}" rec["record"] = rec_num rec["cid"] = None if rec["CAS"] and not rec.get("mixture", False): cas = rec["CAS"] if cas in missing_pubchem: cid_val = missing_pubchem[cas] else: try: cid_val = migrant(cas, annex1=False).cid except Exception: printWARN(f"🇺🇸 Warning: PubChem lookup failed for {rec['name']} (CAS {cas}).") cid_val = None missing_pubchem[cas] = None rec["cid"] = cid_val elif rec["CAS"] and rec.get("mixture", False): cid_list = [] for cas in rec["CAS"]: if cas in missing_pubchem: cid_val = missing_pubchem[cas] else: try: cid_val = migrant(cas, annex1=False).cid except Exception: printWARN(f"🇺🇸 Warning: PubChem lookup failed for component with CAS {cas} in record {rec_num}.") cid_val = None missing_pubchem[cas] = None cid_list.append(cid_val) rec["cid"] = cid_list ordered_rec = { "record": rec["record"], "cid": rec["cid"], "name": rec["name"], "CAS": rec["CAS"], "FCNNo": rec["FCNNo"], "FoodContactSubstance": rec["FoodContactSubstance"], "mixture": rec["mixture"], "FCSreplacedby": rec["FCSreplacedby"], "FCSreplacedby_record": None, "notifier": rec["notifier"], "manufacturer": rec["manufacturer"], "NotificationDate": rec["NotificationDate"], "engine": rec["engine"], "csfile": rec["csvFile"], "date": rec["date"] } rec_filename = f"rec{rec_num:05d}.fcn.json" json_filename = os.path.join(self.cache_dir, rec_filename) with open(json_filename, "w", encoding="utf-8") as jf: json.dump(ordered_rec, jf, ensure_ascii=False, indent=2) new_index["order"].append(rec_num) if rec.get("mixture", False): for nm in rec["name"]: new_index["name"].setdefault(nm, []).append(rec_num) else: new_index["name"].setdefault(rec["name"], []).append(rec_num) if rec.get("mixture", False): for cas in rec["CAS"]: new_index["CAS"].setdefault(cas, []).append(rec_num) else: new_index["CAS"].setdefault(rec["CAS"], []).append(rec_num) new_index["FCNNo"].setdefault(rec["FCNNo"], []).append(rec_num) if rec["FCSreplacedby"]: new_index["FCSreplacedby"].setdefault(rec["FCSreplacedby"], []).append(rec_num) if rec["cid"] is not None: if rec.get("mixture", False): for cid in rec["cid"]: if cid is not None: new_index["bycid"][str(cid)] = rec_num else: new_index["bycid"][str(rec["cid"])] = rec_num fcnno2recid[rec["FCNNo"]] = rec_num records_list.append((rec_num, ordered_rec)) # Second pass: resolve FCSreplacedby_record using FCNNo mapping for rec_id, rec in records_list: fcsrep_field = rec.get("FCSreplacedby", "").strip() if fcsrep_field: # *Extract FCN number from FCSreplacedby field if present* fcsrep_match = re.search(r'FCN\s*(\d+)', fcsrep_field, re.IGNORECASE) if fcsrep_match: fcsrep = fcsrep_match.group(1) else: fcsrep = fcsrep_field rep_recid = fcnno2recid.get(fcsrep) rec["FCSreplacedby_record"] = rep_recid json_filename = os.path.join(self.cache_dir, f"rec{rec_id:05d}.fcn.json") try: with open(json_filename, "w", encoding="utf-8") as jf: json.dump(rec, jf, ensure_ascii=False, indent=2) except Exception as e: printWARN(f"🇺🇸 Warning: Could not update FCSreplacedby_record in {json_filename}: {e}") with open(self.index_file, "w", encoding="utf-8") as f_index: json.dump(new_index, f_index, ensure_ascii=False, indent=2) with open(missing_file, "w", encoding="utf-8") as mf: json.dump(missing_pubchem, mf, ensure_ascii=False, indent=2) self.index = new_index self.order = new_index.get("order", []) self._records_cache = {} def _load_record(self, rec_id, order=None, db=False): if rec_id in self._records_cache: if self._pubchem: if db: return fcnrecord_ext(self._records_cache[rec_id], self) else: return fcnrecord_ext(self._records_cache[rec_id]) else: return self._records_cache[rec_id] json_filename = os.path.join(self.cache_dir, f"rec{rec_id:05d}.fcn.json") if not os.path.exists(json_filename): printWARN(f"🇺🇸 Warning: Record file for record {rec_id} not found.") return None with open(json_filename, "r", encoding="utf-8") as jf: rec = json.load(jf) record_obj = fcnrecord(rec, order=rec.get("record"), total=len(self.order)) self._records_cache[rec_id] = record_obj if self._pubchem: return fcnrecord_ext(record_obj, self) else: return record_obj def __getitem__(self, key): if isinstance(key, slice): start = key.start if key.start is not None else min(self.order) stop = key.stop if key.stop is not None else max(self.order) + 1 rec_ids = [rid for rid in self.order if start <= rid < stop] if not rec_ids: raise KeyError(f"No records found in range {start} to {stop - 1}. Valid records range from {min(self.order)} to {max(self.order)}.") return [self._load_record(rid, order=rid) for rid in rec_ids] elif isinstance(key, int): if key in self.order: return self._load_record(key, order=key) else: raise KeyError(f"Record number {key} not found. Valid records range from {min(self.order)} to {max(self.order)}.") elif isinstance(key, (list, tuple)): return [self.__getitem__(k) for k in key] elif isinstance(key, str): # First, try CAS lookup. if key in self.index.get("CAS", {}): rec_ids = self.index["CAS"][key] if len(rec_ids) == 1: return self._load_record(rec_ids[0], order=rec_ids[0]) else: return [self._load_record(rid, order=rid) for rid in rec_ids] # Then, if key is all digits, try FCNNo lookup. elif key.isdigit() and key in self.index.get("FCNNo", {}): rec_ids = self.index["FCNNo"][key] if len(rec_ids) == 1: return self._load_record(rec_ids[0], order=rec_ids[0]) else: return [self._load_record(rid, order=rid) for rid in rec_ids] else: available = list(self.index.get("CAS", {}).keys()) + list(self.index.get("FCNNo", {}).keys()) sample = ", ".join(available[:10]) + (" ..." if len(available) > 10 else "") raise KeyError(f"Key '{key}' not found in index. Valid keys include: {sample}") else: raise KeyError(f"Unsupported key type: {type(key)}") def __call__(self, *args): if len(args) == 1 and isinstance(args[0], (list, tuple)): args = args[0] results = [] for arg in args: if isinstance(arg, int): argkey = str(arg) if arg in self.order: results.append(self._load_record(arg)) elif "bycid" in self.index and argkey in self.index["bycid"]: rec_id = self.index["bycid"][argkey] results.append(self._load_record(rec_id)) else: printWARN(f"🇺🇸 Warning: Record for identifier {arg} not found.") results.append(None) elif isinstance(arg, str): result_item = self.__getitem__(arg) if isinstance(result_item, list): results.extend(result_item) else: results.append(result_item) else: raise KeyError(f"Unsupported key type in call: {type(arg)}") return results[0] if len(results) == 1 else results def byname(self, name): name = name[0] if isinstance(name, list) else name rec_ids = self.index.get("name", {}).get(name, []) return [self._load_record(rid, order=rid) for rid in rec_ids] def byCAS(self, cas): cas = cas[0] if isinstance(cas, list) else cas rec_ids = self.index.get("CAS", {}).get(cas, []) if len(rec_ids) == 1: return self._load_record(rec_ids[0], order=rec_ids[0]) else: return [self._load_record(rid, order=rid) for rid in rec_ids] def byFCNNo(self, fcn_no): fcn_no = fcn_no[0] if isinstance(fcn_no, list) else fcn_no rec_ids = self.index.get("FCNNo", {}).get(fcn_no, []) if len(rec_ids) == 1: return self._load_record(rec_ids[0], order=rec_ids[0]) else: return [self._load_record(rid, order=rid) for rid in rec_ids] def byFCSreplacedby(self, fcsrep): fcsrep = fcsrep[0] if isinstance(fcsrep, list) else fcsrep rec_ids = self.index.get("FCSreplacedby", {}).get(fcsrep, []) if len(rec_ids) == 1: return self._load_record(rec_ids[0], order=rec_ids[0]) else: return [self._load_record(rid, order=rid) for rid in rec_ids] def bycid(self, cid, verbose=True): cid = cid[0] if isinstance(cid, list) else cid cidkey = str(cid) if "bycid" in self.index and cidkey in self.index["bycid"]: rec_id = self.index["bycid"][cidkey] return self._load_record(rec_id, order=rec_id) else: if verbose: printWARN(f"⚠️ Warning: No 🇺🇸 US FDA FCS record found for PubChem cid {cid}.") return None def __iter__(self): for rid in self.order: yield self._load_record(rid, order=rid) def __len__(self): return len(self.order) def __contains__(self, item): if isinstance(item, list): item = item[0] if isinstance(item, int): return item in self.order or (("bycid" in self.index) and (str(item) in self.index["bycid"])) if isinstance(item, str): return item in self.index.get("CAS", {}) or item in self.index.get("FCNNo", {}) return False def __repr__(self): csv_filename = os.path.basename(self.csv_file) index_date = self.index.get("index_date", "unknown") print(f"🇺🇸US FDA FCS database ({len(self.order)} records)") print(f"Imported from CSV {csv_filename} and indexed on {index_date}") return str(self) def __str__(self): return f"<{self.__class__.__name__}: {len(self.order)} records (US FDA FCS)>"
Class variables
var isInitialized
Static methods
def isindexinitialized(cache_dir='cache.USFDAfcn', index_file='fcn_index.json')
-
Expand source code
@classmethod def isindexinitialized(cls, cache_dir="cache.USFDAfcn", index_file="fcn_index.json"): return os.path.exists(os.path.join(os.path.dirname(__file__), cache_dir, index_file))
Methods
def byCAS(self, cas)
-
Expand source code
def byCAS(self, cas): cas = cas[0] if isinstance(cas, list) else cas rec_ids = self.index.get("CAS", {}).get(cas, []) if len(rec_ids) == 1: return self._load_record(rec_ids[0], order=rec_ids[0]) else: return [self._load_record(rid, order=rid) for rid in rec_ids]
def byFCNNo(self, fcn_no)
-
Expand source code
def byFCNNo(self, fcn_no): fcn_no = fcn_no[0] if isinstance(fcn_no, list) else fcn_no rec_ids = self.index.get("FCNNo", {}).get(fcn_no, []) if len(rec_ids) == 1: return self._load_record(rec_ids[0], order=rec_ids[0]) else: return [self._load_record(rid, order=rid) for rid in rec_ids]
def byFCSreplacedby(self, fcsrep)
-
Expand source code
def byFCSreplacedby(self, fcsrep): fcsrep = fcsrep[0] if isinstance(fcsrep, list) else fcsrep rec_ids = self.index.get("FCSreplacedby", {}).get(fcsrep, []) if len(rec_ids) == 1: return self._load_record(rec_ids[0], order=rec_ids[0]) else: return [self._load_record(rid, order=rid) for rid in rec_ids]
def bycid(self, cid, verbose=True)
-
Expand source code
def bycid(self, cid, verbose=True): cid = cid[0] if isinstance(cid, list) else cid cidkey = str(cid) if "bycid" in self.index and cidkey in self.index["bycid"]: rec_id = self.index["bycid"][cidkey] return self._load_record(rec_id, order=rec_id) else: if verbose: printWARN(f"⚠️ Warning: No 🇺🇸 US FDA FCS record found for PubChem cid {cid}.") return None
def byname(self, name)
-
Expand source code
def byname(self, name): name = name[0] if isinstance(name, list) else name rec_ids = self.index.get("name", {}).get(name, []) return [self._load_record(rid, order=rid) for rid in rec_ids]
def refresh_index(self)
-
Expand source code
def refresh_index(self): from patankar.loadpubchem import migrant new_index = {} index_date = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") new_index["index_date"] = index_date new_index["csv_file"] = os.path.basename(self.csv_file) new_index["order"] = [] for key in ["name", "CAS", "FCNNo", "FCSreplacedby"]: new_index[key] = {} new_index["bycid"] = {} missing_file = os.path.join(self.cache_dir, "missing.pubchem.fcn.json") if os.path.exists(missing_file): with open(missing_file, "r", encoding="utf-8") as mf: missing_pubchem = json.load(mf) else: missing_pubchem = {} records_list = [] fcnno2recid = {} rec_num = 0 with open(self.csv_file, "r", encoding="latin1") as f: while True: pos = f.tell() line = f.readline() if not line: break if line.lstrip().startswith("FCN No"): f.seek(pos) break reader = csv.reader(f, delimiter=",") header = next(reader, None) for row in reader: if not row or len(row) < 7: continue rec_num += 1 rec = {} fcn_no_match = re.search(r'=T\("(\d+)"\)', row[0]) if fcn_no_match: fcn_no = fcn_no_match.group(1) else: fcn_no = row[0].strip() rec["FCNNo"] = fcn_no # Parse Food Contact Substance from column 6 fcs_text = row[6].strip() rec["FoodContactSubstance"] = fcs_text name_val, cas_val, is_mixture = parse_food_contact_substance(fcs_text) rec["name"] = name_val rec["CAS"] = cas_val rec["mixture"] = is_mixture # FCS REPLACED BY from column 7 rec["FCSreplacedby"] = row[7].strip() if len(row) > 7 else "" # Notifier from column 8 rec["notifier"] = row[8].strip() if len(row) > 8 else "" # *Clean manufacturer field by removing HTML tags (from column 9)* rec["manufacturer"] = clean_html(row[9].strip()) if len(row) > 9 else "" # Notification Date from column 12 (Effective Date) rec["NotificationDate"] = row[12].strip() if len(row) > 12 else "" rec["csvFile"] = os.path.basename(self.csv_file) rec["date"] = index_date rec["engine"] = f"SFPPy: {os.path.basename(__file__)}" rec["record"] = rec_num rec["cid"] = None if rec["CAS"] and not rec.get("mixture", False): cas = rec["CAS"] if cas in missing_pubchem: cid_val = missing_pubchem[cas] else: try: cid_val = migrant(cas, annex1=False).cid except Exception: printWARN(f"🇺🇸 Warning: PubChem lookup failed for {rec['name']} (CAS {cas}).") cid_val = None missing_pubchem[cas] = None rec["cid"] = cid_val elif rec["CAS"] and rec.get("mixture", False): cid_list = [] for cas in rec["CAS"]: if cas in missing_pubchem: cid_val = missing_pubchem[cas] else: try: cid_val = migrant(cas, annex1=False).cid except Exception: printWARN(f"🇺🇸 Warning: PubChem lookup failed for component with CAS {cas} in record {rec_num}.") cid_val = None missing_pubchem[cas] = None cid_list.append(cid_val) rec["cid"] = cid_list ordered_rec = { "record": rec["record"], "cid": rec["cid"], "name": rec["name"], "CAS": rec["CAS"], "FCNNo": rec["FCNNo"], "FoodContactSubstance": rec["FoodContactSubstance"], "mixture": rec["mixture"], "FCSreplacedby": rec["FCSreplacedby"], "FCSreplacedby_record": None, "notifier": rec["notifier"], "manufacturer": rec["manufacturer"], "NotificationDate": rec["NotificationDate"], "engine": rec["engine"], "csfile": rec["csvFile"], "date": rec["date"] } rec_filename = f"rec{rec_num:05d}.fcn.json" json_filename = os.path.join(self.cache_dir, rec_filename) with open(json_filename, "w", encoding="utf-8") as jf: json.dump(ordered_rec, jf, ensure_ascii=False, indent=2) new_index["order"].append(rec_num) if rec.get("mixture", False): for nm in rec["name"]: new_index["name"].setdefault(nm, []).append(rec_num) else: new_index["name"].setdefault(rec["name"], []).append(rec_num) if rec.get("mixture", False): for cas in rec["CAS"]: new_index["CAS"].setdefault(cas, []).append(rec_num) else: new_index["CAS"].setdefault(rec["CAS"], []).append(rec_num) new_index["FCNNo"].setdefault(rec["FCNNo"], []).append(rec_num) if rec["FCSreplacedby"]: new_index["FCSreplacedby"].setdefault(rec["FCSreplacedby"], []).append(rec_num) if rec["cid"] is not None: if rec.get("mixture", False): for cid in rec["cid"]: if cid is not None: new_index["bycid"][str(cid)] = rec_num else: new_index["bycid"][str(rec["cid"])] = rec_num fcnno2recid[rec["FCNNo"]] = rec_num records_list.append((rec_num, ordered_rec)) # Second pass: resolve FCSreplacedby_record using FCNNo mapping for rec_id, rec in records_list: fcsrep_field = rec.get("FCSreplacedby", "").strip() if fcsrep_field: # *Extract FCN number from FCSreplacedby field if present* fcsrep_match = re.search(r'FCN\s*(\d+)', fcsrep_field, re.IGNORECASE) if fcsrep_match: fcsrep = fcsrep_match.group(1) else: fcsrep = fcsrep_field rep_recid = fcnno2recid.get(fcsrep) rec["FCSreplacedby_record"] = rep_recid json_filename = os.path.join(self.cache_dir, f"rec{rec_id:05d}.fcn.json") try: with open(json_filename, "w", encoding="utf-8") as jf: json.dump(rec, jf, ensure_ascii=False, indent=2) except Exception as e: printWARN(f"🇺🇸 Warning: Could not update FCSreplacedby_record in {json_filename}: {e}") with open(self.index_file, "w", encoding="utf-8") as f_index: json.dump(new_index, f_index, ensure_ascii=False, indent=2) with open(missing_file, "w", encoding="utf-8") as mf: json.dump(missing_pubchem, mf, ensure_ascii=False, indent=2) self.index = new_index self.order = new_index.get("order", []) self._records_cache = {}
class fcnrecord (d, order=None, total=None)
-
Represents a single Food Contact Substance Notification record from the US FDA database.
Keys include: - "record": the sequential record number (1-based) - "cid": PubChem compound identifier (or list of cids for mixtures; may be None) - "name": interpreted chemical name (string or list for mixtures) - "CAS": CAS number (string or list for mixtures) - "FCNNo": the FCN number extracted from the CSV row (e.g. from =T("2355")) - "FoodContactSubstance": the full field text from the CSV - "mixture": boolean flag indicating if the record represents a mixture - "FCSreplacedby": original field (as provided) indicating a replacement notification - "FCSreplacedby_record": record number (if found) corresponding to the replacement notification - "notifier", "manufacturer", "NotificationDate": additional fields from the CSV - Traceability fields: "engine", "csfile", "date"
Expand source code
class fcnrecord(dict): """ Represents a single Food Contact Substance Notification record from the US FDA database. Keys include: - "record": the sequential record number (1-based) - "cid": PubChem compound identifier (or list of cids for mixtures; may be None) - "name": interpreted chemical name (string or list for mixtures) - "CAS": CAS number (string or list for mixtures) - "FCNNo": the FCN number extracted from the CSV row (e.g. from =T("2355")) - "FoodContactSubstance": the full field text from the CSV - "mixture": boolean flag indicating if the record represents a mixture - "FCSreplacedby": original field (as provided) indicating a replacement notification - "FCSreplacedby_record": record number (if found) corresponding to the replacement notification - "notifier", "manufacturer", "NotificationDate": additional fields from the CSV - Traceability fields: "engine", "csfile", "date" """ def __init__(self, d, order=None, total=None): if not isinstance(d, dict): raise TypeError("Input must be a dict, not a {}".format(type(d).__name__)) super().__init__(d) self._order = d.get("record", order) self._total = total def __str__(self): cid = self.get("cid", None) order_str = f"{self._order}" if self._order is not None else "?" total_str = f"{self._total}" if self._total is not None else "?" return f"<{self.__class__.__name__} with cid:{cid} - record {order_str} of {total_str} (US FDA FCS)>" def __repr__(self): lines = [] order_str = f"{self._order}" if self._order is not None else "?" total_str = f"{self._total}" if self._total is not None else "?" header = f" ---- [ US FDA FCS record: {order_str} of {total_str} ] ----" lines.append(header) fields_order = [ "record", "cid", "name", "CAS", "FCNNo", "FoodContactSubstance", "mixture", "FCSreplacedby", "FCSreplacedby_record", "notifier", "manufacturer", "NotificationDate" ] for key in fields_order: if key not in self: continue val = self[key] if val is None or (isinstance(val, str) and not val.strip()): continue wrapped_val = custom_wrap(str(val), width=60, indent=" " * 22) lines.append(f"{key:>20}: {wrapped_val}") for key in ["engine", "csfile", "date"]: if key in self: wrapped_val = custom_wrap(str(self[key]), width=60, indent=" " * 22) lines.append(f"{key:>20}: {wrapped_val}") return "\n".join(lines) @property def ispubchemok(self): cas = self.get("CAS") if self.get("mixture", False): return bool(cas and any(c.strip() for c in cas)) return cas not in ("", None)
Ancestors
- builtins.dict
Subclasses
Instance variables
var ispubchemok
-
Expand source code
@property def ispubchemok(self): cas = self.get("CAS") if self.get("mixture", False): return bool(cas and any(c.strip() for c in cas)) return cas not in ("", None)
class fcnrecord_ext (rec, db=None, verbosity=False)
-
Extended fcnrecord that automatically retrieves additional chemical information from PubChem.
For each CAS number (or each CAS in a mixture) the PubChem lookup is performed. The field "cid" is updated to be either a single PubChem CID or a list of CIDs.
Instantiate from a base fcnrecord. If a valid CAS is available, perform PubChem lookup via the 'migrant' function.
Expand source code
class fcnrecord_ext(fcnrecord): """ Extended fcnrecord that automatically retrieves additional chemical information from PubChem. For each CAS number (or each CAS in a mixture) the PubChem lookup is performed. The field "cid" is updated to be either a single PubChem CID or a list of CIDs. """ def __init__(self, rec, db=None, verbosity=False): """ Instantiate from a base fcnrecord. If a valid CAS is available, perform PubChem lookup via the 'migrant' function. """ if not isinstance(rec, fcnrecord): raise TypeError("Input must be an fcnrecord, not a {}".format(type(rec).__name__)) super().__init__(rec, order=rec._order, total=rec._total) from patankar.loadpubchem import migrant if self.ispubchemok: if self.get("mixture", False): cids = [] for cas in self.get("CAS", []): try: m = migrant(cas, annex1=False) cids.append(m.cid) except Exception: if verbosity: printWARN(f"🇺🇸 Warning: PubChem lookup failed for CAS {cas} in record {self.get('record')}") cids.append(DEFAULT_PUBCHEM) self.cid = cids else: cas = self.get("CAS") try: m = migrant(cas, annex1=False) self.cid = m.cid except Exception: if verbosity: printWARN(f"🇺🇸 Warning: PubChem lookup failed for CAS {cas} in record {self.get('record')}") self.cid = DEFAULT_PUBCHEM else: self.cid = None
Ancestors
- fcnrecord
- builtins.dict