Module GBappendixA

Module: GBappendixA

This module implements a robust database manager for the Chinese positive list under GB 9685-2016. It is modeled after the EUFCMannex1 module but with the following key differences:

  1. The CSV file (GB9685-2016.csv) merges 8 tables corresponding to different food contact materials. The table identifier is provided in the first column ("表格") and is mapped to descriptive names (e.g. "A1" becomes "plastics", "A2" becomes "coatings", etc.).
  2. The primary key is based on the second column ("FCA编号"). From an entry like FCAXXXX the digits XXXX are extracted and stored under the key "FCA". Cached files are named FCAXXXX.json.
  3. A substance may appear several times (once per material category) so that the positive list details (usage range, limits, restrictions, etc.) are stored in a sub-dictionary. The “authorized in” field is built as a list of the descriptive table names.
  4. The parsing rules for the remaining columns are as follows:
  5. Column 3 (“中文名称”): stored as "ChineseName".
  6. Column 4 (“CAS号”): stored as a string or a list if multiple CAS numbers (separated by “;”).
  7. Column 5 (“使用范围和最大使用量/%”): split into two fields within the positive list info:
    • "materials": a list obtained by splitting the text before “:” using commas.
    • "CP0max": the numeric value after “:” converted from a percentage (w/w) to mg/kg via multiplication by $10^4$. If no “:” is present, this field is set to None.
  8. Column 6 (“SML/QM/(mg/kg)”): the full raw string is stored in "QMSMLraw". It is parsed to extract:
    • "SML": a single or list of numeric values (ignoring “ND” entries) when the associated text contains “:SML”.
    • "QM": a numeric value when the associated text contains “:QM”.
    • "DL": a detection limit if a pattern like “DL=0.01mg/kg” is found.
  9. Column 7 (“SML(T)/(mg/kg)”): if the field is numeric (or a semicolon‑separated list of numbers) the numeric value(s) are stored in "SMLT"; otherwise, the raw text is stored in "SMLTraw".
  10. Column 8 (“SML(T)”): stored as "SMLTcomment".
  11. Column 9 (“分组编号”): stored as "comment1".
  12. Column 10 (“其他要求”): stored as "comment2".
  13. Finally, the order of the fields in each record is maintained as: (1) FCA, (2) cid, (3) CAS, (4) "authorized in", (5) ChineseName, then the positive list details (under "table"), and finally the traceability fields "engine", "csfile" and "date".

The extended record class, gbrecord_ext, automatically adds additional chemical information from PubChem. The main manager class, GBappendixA, builds an index for fast lookup by CAS, FCA (the primary key), or PubChem cid.

@version: 1.41 @project: SFPPy - SafeFoodPackaging Portal in Python initiative @author: INRAE\olivier.vitrac@agroparistech.fr @licence: MIT @Date: 2024-01-10 @rev: 2025-04-01

Expand source code
"""
Module: GBappendixA

This module implements a robust database manager for the Chinese positive list under GB 9685-2016.
It is modeled after the EUFCMannex1 module but with the following key differences:

1. The CSV file (GB9685-2016.csv) merges 8 tables corresponding to different food contact materials.
   The table identifier is provided in the first column ("表格") and is mapped to descriptive names
   (e.g. "A1" becomes "plastics", "A2" becomes "coatings", etc.).
2. The primary key is based on the second column ("FCA编号"). From an entry like FCAXXXX the digits
   XXXX are extracted and stored under the key "FCA". Cached files are named FCAXXXX.json.
3. A substance may appear several times (once per material category) so that the positive list
   details (usage range, limits, restrictions, etc.) are stored in a sub-dictionary. The “authorized in”
   field is built as a list of the descriptive table names.
4. The parsing rules for the remaining columns are as follows:
   - Column 3 (“中文名称”): stored as "ChineseName".
   - Column 4 (“CAS号”): stored as a string or a list if multiple CAS numbers (separated by “;”).
   - Column 5 (“使用范围和最大使用量/%”): split into two fields within the positive list info:
       - "materials": a list obtained by splitting the text before “:” using commas.
       - "CP0max": the numeric value after “:” converted from a percentage (w/w) to mg/kg via
         multiplication by $10^4$. If no “:” is present, this field is set to None.
   - Column 6 (“SML/QM/(mg/kg)”): the full raw string is stored in "QMSMLraw". It is parsed to extract:
       - "SML": a single or list of numeric values (ignoring “ND” entries) when the associated text contains “:SML”.
       - "QM": a numeric value when the associated text contains “:QM”.
       - "DL": a detection limit if a pattern like “DL=0.01mg/kg” is found.
   - Column 7 (“SML(T)/(mg/kg)”): if the field is numeric (or a semicolon‑separated list of numbers) the
         numeric value(s) are stored in "SMLT"; otherwise, the raw text is stored in "SMLTraw".
   - Column 8 (“SML(T)”): stored as "SMLTcomment".
   - Column 9 (“分组编号”): stored as "comment1".
   - Column 10 (“其他要求”): stored as "comment2".
5. Finally, the order of the fields in each record is maintained as: (1) FCA, (2) cid, (3) CAS, (4) "authorized in",
   (5) ChineseName, then the positive list details (under "table"), and finally the traceability fields "engine", "csfile" and "date".

The extended record class, *gbrecord_ext*, automatically adds additional chemical information from PubChem.
The main manager class, *GBappendixA*, builds an index for fast lookup by CAS, FCA (the primary key), or PubChem cid.

@version: 1.41
@project: SFPPy - SafeFoodPackaging Portal in Python initiative
@author: INRAE\\olivier.vitrac@agroparistech.fr
@licence: MIT
@Date: 2024-01-10
@rev: 2025-04-01

"""

import os, csv, json, datetime, time, re, textwrap

__all__ = ['GBappendixA', 'custom_wrap', 'extract_number_before_keyword', 'extract_number_before_keyword_in_parentheses', 'gbrecord', 'gbrecord_ext', 'printWARN', 'split_col5_content', 'unwrap']


__project__ = "SFPPy"
__author__ = "Olivier Vitrac"
__copyright__ = "Copyright 2022"
__credits__ = ["Olivier Vitrac"]
__license__ = "MIT"
__maintainer__ = "Olivier Vitrac"
__email__ = "olivier.vitrac@agroparistech.fr"
__version__ = "1.41"


# Module-level variables to track last warning message and its timestamp
_LAST_WARN_ = None
_T_LAST_WARN_ = 0.0

# ----------------------------------------------------------------------
# Custom text wrapping function (similar to EU module)
def custom_wrap(text, width=60, indent=" " * 22):
    # Wrap the first line and indent subsequent lines.
    first_line = textwrap.wrap(text, width=width)
    if not first_line:
        return ""
    first = first_line[0]
    remaining = text[len(first):].lstrip()
    subsequent_lines = textwrap.wrap(remaining, width=width)
    wrapped = [first] + [indent + line for line in subsequent_lines]
    return "\n".join(wrapped)

# ----------------------------------------------------------------------
# Show warnings without repeating them
def printWARN(message: str, tsilent: float = 10.0):
    """
    Print a warning message only if:
    - it's different from the last one, or
    - more than `tsilent` seconds have passed since the last identical warning.

    Parameters:
    ----------
    message : str
        The warning message to display.
    tsilent : float, optional
        Minimum time (in seconds) between repeated identical warnings.
    """
    global _LAST_WARN_, _T_LAST_WARN_
    tnow = time.time()
    if message != _LAST_WARN_ or (tnow - _T_LAST_WARN_ > tsilent):
        print(message)
        _LAST_WARN_ = message
        _T_LAST_WARN_ = tnow

# ----------------------------------------------------------------------
# robust numeric extract for GB document
def unwrap(value):
    """
    Unwrap a list-like value:
    - Return None if the value is empty or None
    - Return the sole item if it is a singleton list or tuple
    - Return the full list/tuple otherwise
    """
    if not value:
        return None
    if isinstance(value, (list, tuple)) and len(value) == 1:
        return value[0]
    return value

def extract_number_before_keyword_in_parentheses(text, keyword):
    """
    Extract numbers that are immediately followed by parentheses containing a given keyword.

    Example:
        extract_number_before_keyword_in_parentheses(
            'this is 0.6 (SML) 0.5 (again SML)', 'SML'
        )
        returns: [0.6, 0.5]

    Behavior:
        - If one match is found: return the number as float
        - If no match is found: return None
        - If multiple matches: return list of floats
    """
    pattern = rf'([+-]?\d*\.?\d+)\s*\(([^)]*{re.escape(keyword)}[^)]*)\)'
    matches = re.findall(pattern, text, re.IGNORECASE)
    res = unwrap([float(num) for num, _ in matches])
    return extract_number_before_keyword(text, keyword) if res is None else res


def extract_number_before_keyword(text, keyword):
    """
    Extract numbers that immediately precede a given keyword in the text, allowing for optional punctuation
    or other separators between the number and the keyword (e.g. '3.4 (SML)', '3.4: SML', etc.)

    Example:
        extract_number_before_keyword('1.2 3.4 SML', 'SML')         → 3.4
        extract_number_before_keyword('1 SML 2.0, SML again', 'SML') → [1, 2.0]
        extract_number_before_keyword('4.5: SML and (5.5) SML', 'SML') → [4.5, 5.5]

    Returns:
        - A float or int if a single number is found
        - A list of floats/ints if multiple matches are found
        - None if no match
    """
    pattern = rf'([+-]?\d*\.?\d+)\s*[\W]*\s*{re.escape(keyword)}\b'
    matches = re.findall(pattern, text, re.IGNORECASE)
    return unwrap([float(m) if '.' in m else int(m) for m in matches])


def split_col5_content(text):
    """
    Pattern to match number followed by (...) or [...] that contains a target keyword
    """
    # True regulatory keywords
    keywords = r'(SML|DL|QM|SML\(T\))'
    # Match a value (ND or number) followed by (...) or [...] containing a keyword
    # We will match starting FROM that value
    pattern = re.compile(
        r'(ND|\d+(?:\.\d+)?)(\([^)]*' + keywords + r'[^)]*\)|\[[^\]]*' + keywords + r'[^\]]*\])'
    )
    match = pattern.search(text)
    if match:
        idx = match.start()  # this is the correct split point: just before the match
        main = text[:idx].strip()
        rem = text[idx:].strip()
        return main, rem
    else:
        return text.strip(), ""  # fallback


# ----------------------------------------------------------------------
# gbrecord: represents one substance record from GB 9685-2016.
# The record stores the following keys in order:
#   "FCA" (primary key extracted from FCA编号),
#   "cid" (PubChem identifier),
#   "CAS" (CAS number, string or list),
#   "authorized in" (list of material names in which the substance is allowed),
#   "ChineseName" (the substance name in Chinese),
#   "table" (a list of dictionaries with the positive list details),
#   and traceability keys "engine", "csfile", "date".
# ----------------------------------------------------------------------
class gbrecord(dict):
    def __init__(self, d, order=None, total=None):
        if not isinstance(d, dict):
            raise TypeError(f"dict must be a dict not a {type(d).__name__}")
        super().__init__(d)
        self._order = d.get("FCA", order)
        self._total = total

    def __str__(self):
        cid = self.get("cid", None)
        order_str = f"{self._order}" if self._order is not None else "?"
        total_str = f"{self._total}" if self._total is not None else "?"
        return f"<{self.__class__.__name__} with cid:{cid} - FCA {order_str} of {total_str} (GB 9685-2016)>"

    def __repr__(self):
        lines = []
        order_str = f"{self._order}" if self._order is not None else "?"
        total_str = f"{self._total}" if self._total is not None else "?"
        header = f" ---- [ GB 9685-2016 record: {order_str} of {total_str} ] ----"
        lines.append(header)

        # Define base keys to display first.
        base_keys = {"FCA", "cid", "CAS", "authorized in", "ChineseName", "engine", "csfile", "date"}
        fields_order = ["FCA", "cid", "CAS", "authorized in", "ChineseName"]
        for key in fields_order:
            if key not in self:
                continue
            val = self[key]
            if val is None or (isinstance(val, str) and not val.strip()):
                continue
            wrapped_val = custom_wrap(str(val), width=60, indent=" " * 22)
            lines.append(f"{key:>20}: {wrapped_val}")

        # Display branching positive list details:
        branch_keys = [k for k in self.keys() if k not in base_keys]
        if branch_keys:
            lines.append(f"\n{'--- Positive Lists':>20}:")
            for branch in sorted(branch_keys):
                value = self[branch]
                lines.append(f"  {branch}:")
                # Normalize value to a list
                entries = value if isinstance(value, list) else [value]
                for idx, entry in enumerate(entries, start=1):
                    lines.append(f"    Entry {idx}:")
                    if isinstance(entry, dict):
                        for k, v in entry.items():
                            if v is None or (isinstance(v, str) and not v.strip()):
                                continue
                            wrapped_v = custom_wrap(str(v), width=60, indent=" " * 25)
                            lines.append(f"      {k:>15}: {wrapped_v}")
                    else:
                        wrapped_entry = custom_wrap(str(entry), width=60, indent=" " * 25)
                        lines.append(f"      {wrapped_entry}")

        # Append traceability information.
        for key in ["engine", "csfile", "date"]:
            if key in self and self[key]:
                wrapped_val = custom_wrap(str(self[key]), width=60, indent=" " * 22)
                lines.append(f"{key:>20}: {wrapped_val}")
        return "\n".join(lines)


# ----------------------------------------------------------------------
# gbrecord_ext: extended record that adds chemical info from PubChem.
# ----------------------------------------------------------------------
class gbrecord_ext(gbrecord):
    def __init__(self, rec, db=None):
        if not isinstance(rec, gbrecord):
            raise TypeError(f"dict must be a gbrecord not a {type(rec).__name__}")
        super().__init__(rec, order=rec._order, total=rec._total)
        from patankar.loadpubchem import migrant
        if rec.get("CAS") and (isinstance(rec.get("CAS"), str) and rec.get("CAS").strip() or isinstance(rec.get("CAS"), list)):
            try:
                m = migrant(rec.get("CAS"), annex1=False)
                M = m.M
            except Exception:
                print(f"{rec.get('ChineseName')} could not be extended from its CAS {rec.get('CAS')}: not found")
                M = None
        else:
            M = None
        self.cid = self.get("cid")
        self.M = M
        # Additional extended attributes (e.g. group information) can be added here.
        # [XXX] Additional placeholders remain as needed.

# ----------------------------------------------------------------------
# GBappendixA: the main class to manage the GB 9685-2016 CSV file and cache.
#
# The parsing of each CSV row follows these rules:
#
# 1) Column 1: 表格
#    - The code (e.g. "A1") is mapped to a descriptive name (e.g. "plastics") and stored in the record under "authorized in".
#
# 2) Column 2: FCA编号
#    - Expected in the format "FCAXXXX"; the digits XXXX are extracted and stored under "FCA".
#
# 3) Column 3: 中文名称
#    - Stored as a string under "ChineseName".
#
# 4) Column 4: CAS号
#    - Stored as a string or as a list (if multiple numbers are provided separated by ";").
#
# 5) Column 5: 使用范围和最大使用量/%
#    - Parsed to extract:
#         * "materials": a list (splitting by commas before the colon).
#         * "CP0max": the numeric value after the colon converted to mg/kg by multiplying by $10^4$.
#       If no colon is present, "CP0max" is set to None.
#
# 6) Column 6: SML/QM/(mg/kg)
#    - The full text is stored in "QMSMLraw".
#    - It is parsed by splitting on ";" to detect multiple entries.
#      For each entry:
#         * If the entry contains ":SML" and the value is not "ND", the number is added to the SML list.
#         * If the entry contains ":QM", the numeric value is recorded as QM.
#         * If a pattern like "DL=0.01mg/kg" is found, DL is extracted.
#
# 7) Column 7: SML(T)/(mg/kg)
#    - If the field can be converted to a float (or is a semicolon‑separated list of floats), the value(s) are stored in "SMLT";
#      otherwise, the raw text is stored in "SMLTraw".
#
# 8) Column 8: SML(T)
#    - Stored as "SMLTcomment".
#
# 9) Column 9: 分组编号
#    - Stored as "comment1".
#
# 10) Column 10: 其他要求
#     - Stored as "comment2".
#
# If a substance appears in several rows (i.e. in different tables), the record is merged:
#    - The "authorized in" field becomes the list of all descriptive table names.
#    - The positive list details (columns 5–10) are stored as a list under "table".
#
# Traceability fields "engine", "csfile" and "date" are appended to each record.
#
# A global index is built mapping keys for "CAS", "FCA", "bycid" (PubChem cid) and "ChineseName" to the corresponding record.
# ----------------------------------------------------------------------
class GBappendixA:
    def __init__(self, cache_dir="cache.GBappendixA", index_file="gb_index.json", pubchem=True):
        self.base_dir = os.path.dirname(__file__)
        self.csv_file = os.path.join(self.base_dir, "GB9685-2016.csv")
        if not os.path.exists(self.csv_file):
            raise FileNotFoundError(f"CSV file {self.csv_file} not found.")
        self.cache_dir = os.path.join(self.base_dir, cache_dir)
        if not os.path.exists(self.cache_dir):
            os.makedirs(self.cache_dir)
        self.index_file = os.path.join(self.cache_dir, index_file)
        if os.path.exists(self.index_file):
            with open(self.index_file, "r", encoding="utf-8") as f:
                self.index = json.load(f)
        else:
            self.refresh_index()
        self.order = self.index.get("order", [])
        self._records_cache = {}
        self._pubchem = pubchem # we enforce pubchem, the database is initialized indeed
        GBappendixA.isinitialized = True

    @classmethod
    def isindexinitialized(cls, cache_dir="cache.GBappendixA", index_file="gb_index.json"):
        return os.path.exists(os.path.join(os.path.dirname(__file__), cache_dir, index_file))

    def refresh_index(self):
        """
        Rebuild the global index by reading the CSV file and regenerating each record as FCAXXXX.json.
        The index includes mappings for "CAS", "FCA", "bycid", and "ChineseName".
        """
        # Load missing CAS numbers (only for substances with a CAS)
        missing_file = os.path.join(self.cache_dir, "missing.pubchem.gb.json")
        if os.path.exists(missing_file):
            with open(missing_file, "r", encoding="utf-8") as mf:
                missing_pubchem = json.load(mf)
        else:
            missing_pubchem = {}

        new_index = {}
        new_index["index_date"] = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        new_index["csv_file"] = os.path.basename(self.csv_file)
        new_index["order"] = []
        new_index["CAS"] = {}
        new_index["bycid"] = {}
        new_index["FCA"] = {}
        new_index["ChineseName"] = {}

        # Temporary dictionary to merge records by FCA number.
        records_dict = {}
        # Mapping table codes to descriptive names.
        table_mapping = {
            "A1": "plastics",
            "A2": "coatings",
            "A3": "rubber",
            "A4": "printing inks",
            "A5": "adhesives",
            "A6": "paper and board",
            "A7": "silicon rubber",
            "A7bis": "textile"
        }

        from patankar.loadpubchem import migrant  # Import the PubChem lookup function

        with open(self.csv_file, "r", encoding="utf-8") as f:
            reader = csv.reader(f, delimiter=",", quotechar='"')
            header = next(reader, None)
            # Assume header row exists (starting with "表格")
            for row in reader:
                if not row or len(row) < 10:
                    continue

                # Column 1: 表格
                table_code = row[0].strip()
                table_desc = table_mapping.get(table_code, table_code)

                # Column 2: FCA编号 (e.g. "FCA0001")
                fca_field = row[1].strip()
                m_fca = re.search(r'FCA(\d+)', fca_field)
                if m_fca:
                    fca_num = m_fca.group(1)
                else:
                    continue  # Skip row if FCA format is not recognized.

                # Column 3: 中文名称
                chinese_name = row[2].strip()

                # Column 4: CAS号
                cas_field = row[3].strip()
                if ";" in cas_field:
                    cas_value = [x.strip() for x in cas_field.split(";") if x.strip()]
                else:
                    cas_value = cas_field

                # Column 5: 使用范围和最大使用量/%
                range_field,remcol5 = split_col5_content(row[4].strip())
                materials = []
                CP0max = None
                if ":" in range_field:
                    parts = range_field.split(":")
                    materials = [x.strip() for x in parts[0].split(",") if x.strip()]
                    try:
                        CP0max = float(parts[1].strip()) * 1e4
                    except:
                        CP0max = None
                else:
                    materials = [x.strip() for x in range_field.split(",") if x.strip()]

                # Column 6: SML/QM/(mg/kg)
                smlqm_field = remcol5 + row[5].strip()
                QMSMLraw = smlqm_field
                sml_values = []
                qm_value = None
                dl_value = None
                entries = [entry.strip() for entry in smlqm_field.split(";") if entry.strip()]
                for entry in entries:
                    entry = entry.rstrip("或")
                    m_entry = re.match(r'(?P<val>\d*\.?\d+|ND)\s*\((?P<info>.+?)\)', entry)
                    if m_entry:
                        val_str = m_entry.group("val")
                        info = m_entry.group("info")
                        if val_str.upper() != "ND":
                            try:
                                num_val = float(val_str)
                            except:
                                num_val = None
                        else:
                            num_val = None
                        if ":SML" in info:
                            if num_val is not None:
                                sml_values.append(num_val)
                        if ":QM" in info:
                            if num_val is not None:
                                qm_value = num_val
                        dl_match = re.search(r'DL=([\d\.]+)mg/kg', info)
                        if dl_match:
                            try:
                                dl_value = float(dl_match.group(1))
                            except:
                                dl_value = None
                SML = unwrap(sml_values)
                QM = unwrap(qm_value)
                DL = unwrap(dl_value)
                # second pass for column 6
                if SML is None and "SML" in smlqm_field:
                    SML = extract_number_before_keyword_in_parentheses(smlqm_field,"SML")
                if QM is None and "QM" in smlqm_field:
                    QM = extract_number_before_keyword_in_parentheses(smlqm_field, "QM")
                if DL is None and "DL" in smlqm_field:
                    DL = extract_number_before_keyword_in_parentheses(smlqm_field, "DL")

                # Column 7: SML(T)/(mg/kg)
                smlt_field = row[6].strip()
                SMLT = None
                SMLTraw = ""
                if smlt_field:
                    parts = [x.strip() for x in smlt_field.split(";") if x.strip()]
                    parsed_parts = []
                    for part in parts:
                        try:
                            parsed_parts.append(float(part))
                        except:
                            parsed_parts.append(part)
                    if all(isinstance(x, float) for x in parsed_parts):
                        SMLT = parsed_parts[0] if len(parsed_parts) == 1 else parsed_parts
                    else:
                        SMLT = extract_number_before_keyword_in_parentheses(smlt_field, "SML")
                        SMLTraw = smlt_field if SMLT is None else SMLTraw

                # consistency rule
                if SMLT is not None and SML is None:
                    SML = SMLT

                # Column 8: SML(T)
                SMLTcomment = row[7].strip()
                # Column 9: 分组编号
                comment1 = row[8].strip()
                # Column 10: 其他要求
                comment2 = row[9].strip()

                # Assemble the positive list info for this row.
                pos_info = {
                    "materials": materials,
                    "CP0max": CP0max,
                    "QMSMLraw": QMSMLraw,
                    "SML": SML,
                    "QM": QM,
                    "DL": DL,
                    "SMLT": SMLT,
                    "SMLTraw": SMLTraw,
                    "SMLTcomment": SMLTcomment,
                    "comment1": comment1,
                    "comment2": comment2,
                    "table_id": table_desc
                }

                # --- Begin PubChem lookup for cid ---
                if cas_value:
                    if isinstance(cas_value, list):
                        cas_lookup = cas_value[0]
                    else:
                        cas_lookup = cas_value
                    if cas_lookup and cas_lookup.strip():
                        if cas_lookup in missing_pubchem:
                            cid_val = missing_pubchem[cas_lookup]
                        else:
                            try:
                                cid_val = migrant(cas_lookup, annex1=False).cid
                            except ValueError:
                                printWARN(f"🇨🇳 Warning: substance {chinese_name} (CAS {cas_lookup}) not found in PubChem.")
                                cid_val = None
                                missing_pubchem[cas_lookup] = None
                    else:
                        cid_val = None
                else:
                    cid_val = None
                # --- End PubChem lookup ---

                # Prepare a temporary record for this row using columns 1-4 and traceability.
                temp_rec = {
                    "FCA": fca_num,
                    "cid": cid_val,
                    "CAS": cas_value,
                    "authorized in": [table_desc],
                    "ChineseName": chinese_name,
                    "engine": "SFPPy: GBappendixA module",
                    "csfile": os.path.basename(self.csv_file),
                    "date": new_index["index_date"]
                }
                # The positive list details (columns 5-10) are stored in pos_info.
                # Merge records by FCA number.
                if fca_num in records_dict:
                    record = records_dict[fca_num]
                    # Update the "authorized in" list if the category is new.
                    if table_desc not in record.get("authorized in", []):
                        record["authorized in"].append(table_desc)
                    # Branch under the key corresponding to table_desc.
                    if table_desc in record:
                        # If an entry already exists for this category, append the new pos_info.
                        existing = record[table_desc]
                        if isinstance(existing, list):
                            existing.append(pos_info)
                        else:
                            record[table_desc] = [existing, pos_info]
                    else:
                        record[table_desc] = pos_info
                else:
                    # For a new FCA, include the branch with key table_desc.
                    temp_rec[table_desc] = pos_info
                    records_dict[fca_num] = temp_rec

                # Update index for CAS and ChineseName.
                if cas_value:
                    if isinstance(cas_value, list):
                        for cas in cas_value:
                            new_index["CAS"].setdefault(cas, []).append(fca_num)
                    else:
                        new_index["CAS"].setdefault(cas_value, []).append(fca_num)
                new_index["FCA"].setdefault(fca_num, []).append(fca_num)
                new_index["ChineseName"].setdefault(chinese_name, []).append(fca_num)

        # Write individual record files and build the order list.
        order_list = sorted(records_dict.keys(), key=lambda x: int(x))
        for fca in order_list:
            record = records_dict[fca]
            record_filename = f"FCA{int(fca):04d}.json"
            json_filename = os.path.join(self.cache_dir, record_filename)
            with open(json_filename, "w", encoding="utf-8") as jf:
                json.dump(record, jf, ensure_ascii=False, indent=2)
            new_index["order"].append(fca)
            if record.get("cid") is not None:
                new_index["bycid"][str(record["cid"])] = fca

        with open(self.index_file, "w", encoding="utf-8") as f:
            json.dump(new_index, f, ensure_ascii=False, indent=2)
        with open(missing_file, "w", encoding="utf-8") as mf:
            json.dump(missing_pubchem, mf, ensure_ascii=False, indent=2)
        self.index = new_index
        self.order = new_index.get("order", [])
        self._records_cache = {}


    def _load_record(self, fca, order=None, db=False):
        """
        Load a record (as a gbrecord) from its cached JSON file.
        If PubChem extension is enabled, the record is returned as a gbrecord_ext.
        """
        if fca in self._records_cache:
            if self._pubchem:
                return gbrecord_ext(self._records_cache[fca], self) if db else gbrecord_ext(self._records_cache[fca])
            else:
                return self._records_cache[fca]
        json_filename = os.path.join(self.cache_dir, f"FCA{int(fca):04d}.json")
        if not os.path.exists(json_filename):
            print(f"⚠️ Warning: Record file for 🇨🇳 FCA {fca} not found.")
            return None
        with open(json_filename, "r", encoding="utf-8") as jf:
            rec = json.load(jf)
        record_obj = gbrecord(rec, order=rec.get("FCA"), total=len(self.order))
        self._records_cache[fca] = record_obj
        if self._pubchem:
            return gbrecord_ext(record_obj, self) if db else gbrecord_ext(record_obj)
        else:
            return record_obj

    def __getitem__(self, key):
        """
        Supports lookup by:
         - Slice: returns a list of records whose FCA numbers fall within the slice.
         - Integer or string: interpreted as an FCA number.
         - String: if not matching an FCA number then interpreted as a CAS number.
         - List/tuple: returns a list of corresponding records.
        """
        if isinstance(key, slice):
            start = key.start if key.start is not None else min(self.order, key=lambda x: int(x))
            stop = key.stop if key.stop is not None else max(self.order, key=lambda x: int(x)) + 1
            rec_keys = [k for k in self.order if int(start) <= int(k) < int(stop)]
            if not rec_keys:
                raise KeyError(f"No records found in range {start} to {stop - 1}. Valid FCA numbers range from {min(self.order)} to {max(self.order)}.")
            return [self._load_record(k, order=k) for k in rec_keys]
        elif isinstance(key, (int, str)):
            key_str = str(key)
            if key_str in self.order:
                return self._load_record(key_str, order=key_str)
            elif key in self.index.get("CAS", {}):
                rec_keys = self.index["CAS"][key]
                if len(rec_keys) == 1:
                    return self._load_record(rec_keys[0], order=rec_keys[0])
                else:
                    return [self._load_record(k, order=k) for k in rec_keys]
            else:
                raise KeyError(f"Key '{key}' not found. Valid keys include FCA numbers and CAS numbers.")
        elif isinstance(key, (list, tuple)):
            return [self.__getitem__(k) for k in key]
        else:
            raise KeyError(f"Unsupported key type: {type(key)}")

    def __call__(self, *args):
        """
        Callable access. For example:
         - GBappendixA(cid) returns the record for a given PubChem cid.
         - GBappendixA(fca) returns the record for a given FCA number.
         - GBappendixA("CAS") returns the record(s) for a given CAS number.
         - Multiple arguments return a list.
        """
        if len(args) == 1 and isinstance(args[0], (list, tuple)):
            args = args[0]
        results = []
        for arg in args:
            if isinstance(arg, int):
                arg_str = str(arg)
                if arg_str in self.order:
                    results.append(self._load_record(arg_str))
                elif arg_str in self.index.get("bycid", {}):
                    fca = self.index["bycid"][arg_str]
                    results.append(self._load_record(fca))
                else:
                    print(f"🇨🇳 Warning: Record for identifier {arg} not found.")
                    results.append(None)
            elif isinstance(arg, str):
                try:
                    result_item = self.__getitem__(arg)
                    if isinstance(result_item, list):
                        results.extend(result_item)
                    else:
                        results.append(result_item)
                except KeyError as e:
                    print(e)
                    results.append(None)
            else:
                raise KeyError(f"Unsupported key type in call: {type(arg)}")
        return results[0] if len(results) == 1 else results

    def byCAS(self, cas):
        if isinstance(cas, list):
            cas = cas[0]
        rec_keys = self.index.get("CAS", {}).get(cas, [])
        if len(rec_keys) == 1:
            return self._load_record(rec_keys[0], order=rec_keys[0], db=True)
        else:
            return [self._load_record(k, order=k, db=True) for k in rec_keys]

    def byFCA(self, fca):
        fca_str = str(fca)
        if fca_str in self.order:
            return self._load_record(fca_str, order=fca_str)
        else:
            raise KeyError(f"🇨🇳 FCA number {fca} not found. Valid FCA numbers range from {min(self.order)} to {max(self.order)}.")

    def bycid(self, cid, verbose=True):
        cid_str = str(cid)
        if cid_str in self.index.get("bycid", {}):
            fca = self.index["bycid"][cid_str]
            return self._load_record(fca, order=fca, db=True)
        else:
            if verbose:
                print(f"⚠️ Warning: No 🇨🇳 GB 9685-2016 record found for PubChem cid {cid}.")
            return None

    def __iter__(self):
        for fca in self.order:
            yield self._load_record(fca, order=fca)

    def __len__(self):
        return len(self.order)

    def __contains__(self, item):
        if isinstance(item, (list, tuple)):
            item = item[0]
        if isinstance(item, int):
            return str(item) in self.order or str(item) in self.index.get("bycid", {})
        if isinstance(item, str):
            return item in self.index.get("CAS", {})
        return False

    def __repr__(self):
        csv_filename = os.path.basename(self.csv_file)
        index_date = self.index.get("index_date", "unknown")
        print(f"GB 9685-2016 positive list ({len(self.order)} records)")
        print(f"Imported from CSV {csv_filename} and indexed on {index_date}")
        return str(self)

    def __str__(self):
        return f"<{self.__class__.__name__}: {len(self.order)} records (GB 9685-2016)>"

# -------------------------------------------------------------------
# Example usage (for debugging / standalone tests)
# -------------------------------------------------------------------
if __name__ == "__main__":
    dbappendixA = GBappendixA(pubchem=True)
    # Lookup by FCA number:
    rec = dbappendixA.byFCA("0001")
    print(repr(rec))
    # Lookup by CAS:
    try:
        rec_by_cas = dbappendixA.byCAS("25013-16-5")
        print(rec_by_cas)
    except KeyError as e:
        print(e)
    # Lookup by PubChem cid:
    rec_by_cid = dbappendixA.bycid(6581)
    print(rec_by_cid)
    # Callable access example:
    rec_call = dbappendixA("25013-16-5")
    print(rec_call)
    # Iterate over records:
    #for record in dbappendixA:
    #    print(record.get("FCA"), record.get("ChineseName"))

Functions

def custom_wrap(text, width=60, indent=' ')
Expand source code
def custom_wrap(text, width=60, indent=" " * 22):
    # Wrap the first line and indent subsequent lines.
    first_line = textwrap.wrap(text, width=width)
    if not first_line:
        return ""
    first = first_line[0]
    remaining = text[len(first):].lstrip()
    subsequent_lines = textwrap.wrap(remaining, width=width)
    wrapped = [first] + [indent + line for line in subsequent_lines]
    return "\n".join(wrapped)
def extract_number_before_keyword(text, keyword)

Extract numbers that immediately precede a given keyword in the text, allowing for optional punctuation or other separators between the number and the keyword (e.g. '3.4 (SML)', '3.4: SML', etc.)

Example

extract_number_before_keyword('1.2 3.4 SML', 'SML') → 3.4 extract_number_before_keyword('1 SML 2.0, SML again', 'SML') → [1, 2.0] extract_number_before_keyword('4.5: SML and (5.5) SML', 'SML') → [4.5, 5.5]

Returns

  • A float or int if a single number is found
  • A list of floats/ints if multiple matches are found
  • None if no match
Expand source code
def extract_number_before_keyword(text, keyword):
    """
    Extract numbers that immediately precede a given keyword in the text, allowing for optional punctuation
    or other separators between the number and the keyword (e.g. '3.4 (SML)', '3.4: SML', etc.)

    Example:
        extract_number_before_keyword('1.2 3.4 SML', 'SML')         → 3.4
        extract_number_before_keyword('1 SML 2.0, SML again', 'SML') → [1, 2.0]
        extract_number_before_keyword('4.5: SML and (5.5) SML', 'SML') → [4.5, 5.5]

    Returns:
        - A float or int if a single number is found
        - A list of floats/ints if multiple matches are found
        - None if no match
    """
    pattern = rf'([+-]?\d*\.?\d+)\s*[\W]*\s*{re.escape(keyword)}\b'
    matches = re.findall(pattern, text, re.IGNORECASE)
    return unwrap([float(m) if '.' in m else int(m) for m in matches])
def extract_number_before_keyword_in_parentheses(text, keyword)

Extract numbers that are immediately followed by parentheses containing a given keyword.

Example

extract_number_before_keyword_in_parentheses( 'this is 0.6 (SML) 0.5 (again SML)', 'SML' ) returns: [0.6, 0.5]

Behavior

  • If one match is found: return the number as float
  • If no match is found: return None
  • If multiple matches: return list of floats
Expand source code
def extract_number_before_keyword_in_parentheses(text, keyword):
    """
    Extract numbers that are immediately followed by parentheses containing a given keyword.

    Example:
        extract_number_before_keyword_in_parentheses(
            'this is 0.6 (SML) 0.5 (again SML)', 'SML'
        )
        returns: [0.6, 0.5]

    Behavior:
        - If one match is found: return the number as float
        - If no match is found: return None
        - If multiple matches: return list of floats
    """
    pattern = rf'([+-]?\d*\.?\d+)\s*\(([^)]*{re.escape(keyword)}[^)]*)\)'
    matches = re.findall(pattern, text, re.IGNORECASE)
    res = unwrap([float(num) for num, _ in matches])
    return extract_number_before_keyword(text, keyword) if res is None else res
def printWARN(message: str, tsilent: float = 10.0)

Print a warning message only if: - it's different from the last one, or - more than tsilent seconds have passed since the last identical warning.

Parameters:

message : str The warning message to display. tsilent : float, optional Minimum time (in seconds) between repeated identical warnings.

Expand source code
def printWARN(message: str, tsilent: float = 10.0):
    """
    Print a warning message only if:
    - it's different from the last one, or
    - more than `tsilent` seconds have passed since the last identical warning.

    Parameters:
    ----------
    message : str
        The warning message to display.
    tsilent : float, optional
        Minimum time (in seconds) between repeated identical warnings.
    """
    global _LAST_WARN_, _T_LAST_WARN_
    tnow = time.time()
    if message != _LAST_WARN_ or (tnow - _T_LAST_WARN_ > tsilent):
        print(message)
        _LAST_WARN_ = message
        _T_LAST_WARN_ = tnow
def split_col5_content(text)

Pattern to match number followed by (…) or […] that contains a target keyword

Expand source code
def split_col5_content(text):
    """
    Pattern to match number followed by (...) or [...] that contains a target keyword
    """
    # True regulatory keywords
    keywords = r'(SML|DL|QM|SML\(T\))'
    # Match a value (ND or number) followed by (...) or [...] containing a keyword
    # We will match starting FROM that value
    pattern = re.compile(
        r'(ND|\d+(?:\.\d+)?)(\([^)]*' + keywords + r'[^)]*\)|\[[^\]]*' + keywords + r'[^\]]*\])'
    )
    match = pattern.search(text)
    if match:
        idx = match.start()  # this is the correct split point: just before the match
        main = text[:idx].strip()
        rem = text[idx:].strip()
        return main, rem
    else:
        return text.strip(), ""  # fallback
def unwrap(value)

Unwrap a list-like value: - Return None if the value is empty or None - Return the sole item if it is a singleton list or tuple - Return the full list/tuple otherwise

Expand source code
def unwrap(value):
    """
    Unwrap a list-like value:
    - Return None if the value is empty or None
    - Return the sole item if it is a singleton list or tuple
    - Return the full list/tuple otherwise
    """
    if not value:
        return None
    if isinstance(value, (list, tuple)) and len(value) == 1:
        return value[0]
    return value

Classes

class GBappendixA (cache_dir='cache.GBappendixA', index_file='gb_index.json', pubchem=True)
Expand source code
class GBappendixA:
    def __init__(self, cache_dir="cache.GBappendixA", index_file="gb_index.json", pubchem=True):
        self.base_dir = os.path.dirname(__file__)
        self.csv_file = os.path.join(self.base_dir, "GB9685-2016.csv")
        if not os.path.exists(self.csv_file):
            raise FileNotFoundError(f"CSV file {self.csv_file} not found.")
        self.cache_dir = os.path.join(self.base_dir, cache_dir)
        if not os.path.exists(self.cache_dir):
            os.makedirs(self.cache_dir)
        self.index_file = os.path.join(self.cache_dir, index_file)
        if os.path.exists(self.index_file):
            with open(self.index_file, "r", encoding="utf-8") as f:
                self.index = json.load(f)
        else:
            self.refresh_index()
        self.order = self.index.get("order", [])
        self._records_cache = {}
        self._pubchem = pubchem # we enforce pubchem, the database is initialized indeed
        GBappendixA.isinitialized = True

    @classmethod
    def isindexinitialized(cls, cache_dir="cache.GBappendixA", index_file="gb_index.json"):
        return os.path.exists(os.path.join(os.path.dirname(__file__), cache_dir, index_file))

    def refresh_index(self):
        """
        Rebuild the global index by reading the CSV file and regenerating each record as FCAXXXX.json.
        The index includes mappings for "CAS", "FCA", "bycid", and "ChineseName".
        """
        # Load missing CAS numbers (only for substances with a CAS)
        missing_file = os.path.join(self.cache_dir, "missing.pubchem.gb.json")
        if os.path.exists(missing_file):
            with open(missing_file, "r", encoding="utf-8") as mf:
                missing_pubchem = json.load(mf)
        else:
            missing_pubchem = {}

        new_index = {}
        new_index["index_date"] = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        new_index["csv_file"] = os.path.basename(self.csv_file)
        new_index["order"] = []
        new_index["CAS"] = {}
        new_index["bycid"] = {}
        new_index["FCA"] = {}
        new_index["ChineseName"] = {}

        # Temporary dictionary to merge records by FCA number.
        records_dict = {}
        # Mapping table codes to descriptive names.
        table_mapping = {
            "A1": "plastics",
            "A2": "coatings",
            "A3": "rubber",
            "A4": "printing inks",
            "A5": "adhesives",
            "A6": "paper and board",
            "A7": "silicon rubber",
            "A7bis": "textile"
        }

        from patankar.loadpubchem import migrant  # Import the PubChem lookup function

        with open(self.csv_file, "r", encoding="utf-8") as f:
            reader = csv.reader(f, delimiter=",", quotechar='"')
            header = next(reader, None)
            # Assume header row exists (starting with "表格")
            for row in reader:
                if not row or len(row) < 10:
                    continue

                # Column 1: 表格
                table_code = row[0].strip()
                table_desc = table_mapping.get(table_code, table_code)

                # Column 2: FCA编号 (e.g. "FCA0001")
                fca_field = row[1].strip()
                m_fca = re.search(r'FCA(\d+)', fca_field)
                if m_fca:
                    fca_num = m_fca.group(1)
                else:
                    continue  # Skip row if FCA format is not recognized.

                # Column 3: 中文名称
                chinese_name = row[2].strip()

                # Column 4: CAS号
                cas_field = row[3].strip()
                if ";" in cas_field:
                    cas_value = [x.strip() for x in cas_field.split(";") if x.strip()]
                else:
                    cas_value = cas_field

                # Column 5: 使用范围和最大使用量/%
                range_field,remcol5 = split_col5_content(row[4].strip())
                materials = []
                CP0max = None
                if ":" in range_field:
                    parts = range_field.split(":")
                    materials = [x.strip() for x in parts[0].split(",") if x.strip()]
                    try:
                        CP0max = float(parts[1].strip()) * 1e4
                    except:
                        CP0max = None
                else:
                    materials = [x.strip() for x in range_field.split(",") if x.strip()]

                # Column 6: SML/QM/(mg/kg)
                smlqm_field = remcol5 + row[5].strip()
                QMSMLraw = smlqm_field
                sml_values = []
                qm_value = None
                dl_value = None
                entries = [entry.strip() for entry in smlqm_field.split(";") if entry.strip()]
                for entry in entries:
                    entry = entry.rstrip("或")
                    m_entry = re.match(r'(?P<val>\d*\.?\d+|ND)\s*\((?P<info>.+?)\)', entry)
                    if m_entry:
                        val_str = m_entry.group("val")
                        info = m_entry.group("info")
                        if val_str.upper() != "ND":
                            try:
                                num_val = float(val_str)
                            except:
                                num_val = None
                        else:
                            num_val = None
                        if ":SML" in info:
                            if num_val is not None:
                                sml_values.append(num_val)
                        if ":QM" in info:
                            if num_val is not None:
                                qm_value = num_val
                        dl_match = re.search(r'DL=([\d\.]+)mg/kg', info)
                        if dl_match:
                            try:
                                dl_value = float(dl_match.group(1))
                            except:
                                dl_value = None
                SML = unwrap(sml_values)
                QM = unwrap(qm_value)
                DL = unwrap(dl_value)
                # second pass for column 6
                if SML is None and "SML" in smlqm_field:
                    SML = extract_number_before_keyword_in_parentheses(smlqm_field,"SML")
                if QM is None and "QM" in smlqm_field:
                    QM = extract_number_before_keyword_in_parentheses(smlqm_field, "QM")
                if DL is None and "DL" in smlqm_field:
                    DL = extract_number_before_keyword_in_parentheses(smlqm_field, "DL")

                # Column 7: SML(T)/(mg/kg)
                smlt_field = row[6].strip()
                SMLT = None
                SMLTraw = ""
                if smlt_field:
                    parts = [x.strip() for x in smlt_field.split(";") if x.strip()]
                    parsed_parts = []
                    for part in parts:
                        try:
                            parsed_parts.append(float(part))
                        except:
                            parsed_parts.append(part)
                    if all(isinstance(x, float) for x in parsed_parts):
                        SMLT = parsed_parts[0] if len(parsed_parts) == 1 else parsed_parts
                    else:
                        SMLT = extract_number_before_keyword_in_parentheses(smlt_field, "SML")
                        SMLTraw = smlt_field if SMLT is None else SMLTraw

                # consistency rule
                if SMLT is not None and SML is None:
                    SML = SMLT

                # Column 8: SML(T)
                SMLTcomment = row[7].strip()
                # Column 9: 分组编号
                comment1 = row[8].strip()
                # Column 10: 其他要求
                comment2 = row[9].strip()

                # Assemble the positive list info for this row.
                pos_info = {
                    "materials": materials,
                    "CP0max": CP0max,
                    "QMSMLraw": QMSMLraw,
                    "SML": SML,
                    "QM": QM,
                    "DL": DL,
                    "SMLT": SMLT,
                    "SMLTraw": SMLTraw,
                    "SMLTcomment": SMLTcomment,
                    "comment1": comment1,
                    "comment2": comment2,
                    "table_id": table_desc
                }

                # --- Begin PubChem lookup for cid ---
                if cas_value:
                    if isinstance(cas_value, list):
                        cas_lookup = cas_value[0]
                    else:
                        cas_lookup = cas_value
                    if cas_lookup and cas_lookup.strip():
                        if cas_lookup in missing_pubchem:
                            cid_val = missing_pubchem[cas_lookup]
                        else:
                            try:
                                cid_val = migrant(cas_lookup, annex1=False).cid
                            except ValueError:
                                printWARN(f"🇨🇳 Warning: substance {chinese_name} (CAS {cas_lookup}) not found in PubChem.")
                                cid_val = None
                                missing_pubchem[cas_lookup] = None
                    else:
                        cid_val = None
                else:
                    cid_val = None
                # --- End PubChem lookup ---

                # Prepare a temporary record for this row using columns 1-4 and traceability.
                temp_rec = {
                    "FCA": fca_num,
                    "cid": cid_val,
                    "CAS": cas_value,
                    "authorized in": [table_desc],
                    "ChineseName": chinese_name,
                    "engine": "SFPPy: GBappendixA module",
                    "csfile": os.path.basename(self.csv_file),
                    "date": new_index["index_date"]
                }
                # The positive list details (columns 5-10) are stored in pos_info.
                # Merge records by FCA number.
                if fca_num in records_dict:
                    record = records_dict[fca_num]
                    # Update the "authorized in" list if the category is new.
                    if table_desc not in record.get("authorized in", []):
                        record["authorized in"].append(table_desc)
                    # Branch under the key corresponding to table_desc.
                    if table_desc in record:
                        # If an entry already exists for this category, append the new pos_info.
                        existing = record[table_desc]
                        if isinstance(existing, list):
                            existing.append(pos_info)
                        else:
                            record[table_desc] = [existing, pos_info]
                    else:
                        record[table_desc] = pos_info
                else:
                    # For a new FCA, include the branch with key table_desc.
                    temp_rec[table_desc] = pos_info
                    records_dict[fca_num] = temp_rec

                # Update index for CAS and ChineseName.
                if cas_value:
                    if isinstance(cas_value, list):
                        for cas in cas_value:
                            new_index["CAS"].setdefault(cas, []).append(fca_num)
                    else:
                        new_index["CAS"].setdefault(cas_value, []).append(fca_num)
                new_index["FCA"].setdefault(fca_num, []).append(fca_num)
                new_index["ChineseName"].setdefault(chinese_name, []).append(fca_num)

        # Write individual record files and build the order list.
        order_list = sorted(records_dict.keys(), key=lambda x: int(x))
        for fca in order_list:
            record = records_dict[fca]
            record_filename = f"FCA{int(fca):04d}.json"
            json_filename = os.path.join(self.cache_dir, record_filename)
            with open(json_filename, "w", encoding="utf-8") as jf:
                json.dump(record, jf, ensure_ascii=False, indent=2)
            new_index["order"].append(fca)
            if record.get("cid") is not None:
                new_index["bycid"][str(record["cid"])] = fca

        with open(self.index_file, "w", encoding="utf-8") as f:
            json.dump(new_index, f, ensure_ascii=False, indent=2)
        with open(missing_file, "w", encoding="utf-8") as mf:
            json.dump(missing_pubchem, mf, ensure_ascii=False, indent=2)
        self.index = new_index
        self.order = new_index.get("order", [])
        self._records_cache = {}


    def _load_record(self, fca, order=None, db=False):
        """
        Load a record (as a gbrecord) from its cached JSON file.
        If PubChem extension is enabled, the record is returned as a gbrecord_ext.
        """
        if fca in self._records_cache:
            if self._pubchem:
                return gbrecord_ext(self._records_cache[fca], self) if db else gbrecord_ext(self._records_cache[fca])
            else:
                return self._records_cache[fca]
        json_filename = os.path.join(self.cache_dir, f"FCA{int(fca):04d}.json")
        if not os.path.exists(json_filename):
            print(f"⚠️ Warning: Record file for 🇨🇳 FCA {fca} not found.")
            return None
        with open(json_filename, "r", encoding="utf-8") as jf:
            rec = json.load(jf)
        record_obj = gbrecord(rec, order=rec.get("FCA"), total=len(self.order))
        self._records_cache[fca] = record_obj
        if self._pubchem:
            return gbrecord_ext(record_obj, self) if db else gbrecord_ext(record_obj)
        else:
            return record_obj

    def __getitem__(self, key):
        """
        Supports lookup by:
         - Slice: returns a list of records whose FCA numbers fall within the slice.
         - Integer or string: interpreted as an FCA number.
         - String: if not matching an FCA number then interpreted as a CAS number.
         - List/tuple: returns a list of corresponding records.
        """
        if isinstance(key, slice):
            start = key.start if key.start is not None else min(self.order, key=lambda x: int(x))
            stop = key.stop if key.stop is not None else max(self.order, key=lambda x: int(x)) + 1
            rec_keys = [k for k in self.order if int(start) <= int(k) < int(stop)]
            if not rec_keys:
                raise KeyError(f"No records found in range {start} to {stop - 1}. Valid FCA numbers range from {min(self.order)} to {max(self.order)}.")
            return [self._load_record(k, order=k) for k in rec_keys]
        elif isinstance(key, (int, str)):
            key_str = str(key)
            if key_str in self.order:
                return self._load_record(key_str, order=key_str)
            elif key in self.index.get("CAS", {}):
                rec_keys = self.index["CAS"][key]
                if len(rec_keys) == 1:
                    return self._load_record(rec_keys[0], order=rec_keys[0])
                else:
                    return [self._load_record(k, order=k) for k in rec_keys]
            else:
                raise KeyError(f"Key '{key}' not found. Valid keys include FCA numbers and CAS numbers.")
        elif isinstance(key, (list, tuple)):
            return [self.__getitem__(k) for k in key]
        else:
            raise KeyError(f"Unsupported key type: {type(key)}")

    def __call__(self, *args):
        """
        Callable access. For example:
         - GBappendixA(cid) returns the record for a given PubChem cid.
         - GBappendixA(fca) returns the record for a given FCA number.
         - GBappendixA("CAS") returns the record(s) for a given CAS number.
         - Multiple arguments return a list.
        """
        if len(args) == 1 and isinstance(args[0], (list, tuple)):
            args = args[0]
        results = []
        for arg in args:
            if isinstance(arg, int):
                arg_str = str(arg)
                if arg_str in self.order:
                    results.append(self._load_record(arg_str))
                elif arg_str in self.index.get("bycid", {}):
                    fca = self.index["bycid"][arg_str]
                    results.append(self._load_record(fca))
                else:
                    print(f"🇨🇳 Warning: Record for identifier {arg} not found.")
                    results.append(None)
            elif isinstance(arg, str):
                try:
                    result_item = self.__getitem__(arg)
                    if isinstance(result_item, list):
                        results.extend(result_item)
                    else:
                        results.append(result_item)
                except KeyError as e:
                    print(e)
                    results.append(None)
            else:
                raise KeyError(f"Unsupported key type in call: {type(arg)}")
        return results[0] if len(results) == 1 else results

    def byCAS(self, cas):
        if isinstance(cas, list):
            cas = cas[0]
        rec_keys = self.index.get("CAS", {}).get(cas, [])
        if len(rec_keys) == 1:
            return self._load_record(rec_keys[0], order=rec_keys[0], db=True)
        else:
            return [self._load_record(k, order=k, db=True) for k in rec_keys]

    def byFCA(self, fca):
        fca_str = str(fca)
        if fca_str in self.order:
            return self._load_record(fca_str, order=fca_str)
        else:
            raise KeyError(f"🇨🇳 FCA number {fca} not found. Valid FCA numbers range from {min(self.order)} to {max(self.order)}.")

    def bycid(self, cid, verbose=True):
        cid_str = str(cid)
        if cid_str in self.index.get("bycid", {}):
            fca = self.index["bycid"][cid_str]
            return self._load_record(fca, order=fca, db=True)
        else:
            if verbose:
                print(f"⚠️ Warning: No 🇨🇳 GB 9685-2016 record found for PubChem cid {cid}.")
            return None

    def __iter__(self):
        for fca in self.order:
            yield self._load_record(fca, order=fca)

    def __len__(self):
        return len(self.order)

    def __contains__(self, item):
        if isinstance(item, (list, tuple)):
            item = item[0]
        if isinstance(item, int):
            return str(item) in self.order or str(item) in self.index.get("bycid", {})
        if isinstance(item, str):
            return item in self.index.get("CAS", {})
        return False

    def __repr__(self):
        csv_filename = os.path.basename(self.csv_file)
        index_date = self.index.get("index_date", "unknown")
        print(f"GB 9685-2016 positive list ({len(self.order)} records)")
        print(f"Imported from CSV {csv_filename} and indexed on {index_date}")
        return str(self)

    def __str__(self):
        return f"<{self.__class__.__name__}: {len(self.order)} records (GB 9685-2016)>"

Static methods

def isindexinitialized(cache_dir='cache.GBappendixA', index_file='gb_index.json')
Expand source code
@classmethod
def isindexinitialized(cls, cache_dir="cache.GBappendixA", index_file="gb_index.json"):
    return os.path.exists(os.path.join(os.path.dirname(__file__), cache_dir, index_file))

Methods

def byCAS(self, cas)
Expand source code
def byCAS(self, cas):
    if isinstance(cas, list):
        cas = cas[0]
    rec_keys = self.index.get("CAS", {}).get(cas, [])
    if len(rec_keys) == 1:
        return self._load_record(rec_keys[0], order=rec_keys[0], db=True)
    else:
        return [self._load_record(k, order=k, db=True) for k in rec_keys]
def byFCA(self, fca)
Expand source code
def byFCA(self, fca):
    fca_str = str(fca)
    if fca_str in self.order:
        return self._load_record(fca_str, order=fca_str)
    else:
        raise KeyError(f"🇨🇳 FCA number {fca} not found. Valid FCA numbers range from {min(self.order)} to {max(self.order)}.")
def bycid(self, cid, verbose=True)
Expand source code
def bycid(self, cid, verbose=True):
    cid_str = str(cid)
    if cid_str in self.index.get("bycid", {}):
        fca = self.index["bycid"][cid_str]
        return self._load_record(fca, order=fca, db=True)
    else:
        if verbose:
            print(f"⚠️ Warning: No 🇨🇳 GB 9685-2016 record found for PubChem cid {cid}.")
        return None
def refresh_index(self)

Rebuild the global index by reading the CSV file and regenerating each record as FCAXXXX.json. The index includes mappings for "CAS", "FCA", "bycid", and "ChineseName".

Expand source code
def refresh_index(self):
    """
    Rebuild the global index by reading the CSV file and regenerating each record as FCAXXXX.json.
    The index includes mappings for "CAS", "FCA", "bycid", and "ChineseName".
    """
    # Load missing CAS numbers (only for substances with a CAS)
    missing_file = os.path.join(self.cache_dir, "missing.pubchem.gb.json")
    if os.path.exists(missing_file):
        with open(missing_file, "r", encoding="utf-8") as mf:
            missing_pubchem = json.load(mf)
    else:
        missing_pubchem = {}

    new_index = {}
    new_index["index_date"] = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    new_index["csv_file"] = os.path.basename(self.csv_file)
    new_index["order"] = []
    new_index["CAS"] = {}
    new_index["bycid"] = {}
    new_index["FCA"] = {}
    new_index["ChineseName"] = {}

    # Temporary dictionary to merge records by FCA number.
    records_dict = {}
    # Mapping table codes to descriptive names.
    table_mapping = {
        "A1": "plastics",
        "A2": "coatings",
        "A3": "rubber",
        "A4": "printing inks",
        "A5": "adhesives",
        "A6": "paper and board",
        "A7": "silicon rubber",
        "A7bis": "textile"
    }

    from patankar.loadpubchem import migrant  # Import the PubChem lookup function

    with open(self.csv_file, "r", encoding="utf-8") as f:
        reader = csv.reader(f, delimiter=",", quotechar='"')
        header = next(reader, None)
        # Assume header row exists (starting with "表格")
        for row in reader:
            if not row or len(row) < 10:
                continue

            # Column 1: 表格
            table_code = row[0].strip()
            table_desc = table_mapping.get(table_code, table_code)

            # Column 2: FCA编号 (e.g. "FCA0001")
            fca_field = row[1].strip()
            m_fca = re.search(r'FCA(\d+)', fca_field)
            if m_fca:
                fca_num = m_fca.group(1)
            else:
                continue  # Skip row if FCA format is not recognized.

            # Column 3: 中文名称
            chinese_name = row[2].strip()

            # Column 4: CAS号
            cas_field = row[3].strip()
            if ";" in cas_field:
                cas_value = [x.strip() for x in cas_field.split(";") if x.strip()]
            else:
                cas_value = cas_field

            # Column 5: 使用范围和最大使用量/%
            range_field,remcol5 = split_col5_content(row[4].strip())
            materials = []
            CP0max = None
            if ":" in range_field:
                parts = range_field.split(":")
                materials = [x.strip() for x in parts[0].split(",") if x.strip()]
                try:
                    CP0max = float(parts[1].strip()) * 1e4
                except:
                    CP0max = None
            else:
                materials = [x.strip() for x in range_field.split(",") if x.strip()]

            # Column 6: SML/QM/(mg/kg)
            smlqm_field = remcol5 + row[5].strip()
            QMSMLraw = smlqm_field
            sml_values = []
            qm_value = None
            dl_value = None
            entries = [entry.strip() for entry in smlqm_field.split(";") if entry.strip()]
            for entry in entries:
                entry = entry.rstrip("或")
                m_entry = re.match(r'(?P<val>\d*\.?\d+|ND)\s*\((?P<info>.+?)\)', entry)
                if m_entry:
                    val_str = m_entry.group("val")
                    info = m_entry.group("info")
                    if val_str.upper() != "ND":
                        try:
                            num_val = float(val_str)
                        except:
                            num_val = None
                    else:
                        num_val = None
                    if ":SML" in info:
                        if num_val is not None:
                            sml_values.append(num_val)
                    if ":QM" in info:
                        if num_val is not None:
                            qm_value = num_val
                    dl_match = re.search(r'DL=([\d\.]+)mg/kg', info)
                    if dl_match:
                        try:
                            dl_value = float(dl_match.group(1))
                        except:
                            dl_value = None
            SML = unwrap(sml_values)
            QM = unwrap(qm_value)
            DL = unwrap(dl_value)
            # second pass for column 6
            if SML is None and "SML" in smlqm_field:
                SML = extract_number_before_keyword_in_parentheses(smlqm_field,"SML")
            if QM is None and "QM" in smlqm_field:
                QM = extract_number_before_keyword_in_parentheses(smlqm_field, "QM")
            if DL is None and "DL" in smlqm_field:
                DL = extract_number_before_keyword_in_parentheses(smlqm_field, "DL")

            # Column 7: SML(T)/(mg/kg)
            smlt_field = row[6].strip()
            SMLT = None
            SMLTraw = ""
            if smlt_field:
                parts = [x.strip() for x in smlt_field.split(";") if x.strip()]
                parsed_parts = []
                for part in parts:
                    try:
                        parsed_parts.append(float(part))
                    except:
                        parsed_parts.append(part)
                if all(isinstance(x, float) for x in parsed_parts):
                    SMLT = parsed_parts[0] if len(parsed_parts) == 1 else parsed_parts
                else:
                    SMLT = extract_number_before_keyword_in_parentheses(smlt_field, "SML")
                    SMLTraw = smlt_field if SMLT is None else SMLTraw

            # consistency rule
            if SMLT is not None and SML is None:
                SML = SMLT

            # Column 8: SML(T)
            SMLTcomment = row[7].strip()
            # Column 9: 分组编号
            comment1 = row[8].strip()
            # Column 10: 其他要求
            comment2 = row[9].strip()

            # Assemble the positive list info for this row.
            pos_info = {
                "materials": materials,
                "CP0max": CP0max,
                "QMSMLraw": QMSMLraw,
                "SML": SML,
                "QM": QM,
                "DL": DL,
                "SMLT": SMLT,
                "SMLTraw": SMLTraw,
                "SMLTcomment": SMLTcomment,
                "comment1": comment1,
                "comment2": comment2,
                "table_id": table_desc
            }

            # --- Begin PubChem lookup for cid ---
            if cas_value:
                if isinstance(cas_value, list):
                    cas_lookup = cas_value[0]
                else:
                    cas_lookup = cas_value
                if cas_lookup and cas_lookup.strip():
                    if cas_lookup in missing_pubchem:
                        cid_val = missing_pubchem[cas_lookup]
                    else:
                        try:
                            cid_val = migrant(cas_lookup, annex1=False).cid
                        except ValueError:
                            printWARN(f"🇨🇳 Warning: substance {chinese_name} (CAS {cas_lookup}) not found in PubChem.")
                            cid_val = None
                            missing_pubchem[cas_lookup] = None
                else:
                    cid_val = None
            else:
                cid_val = None
            # --- End PubChem lookup ---

            # Prepare a temporary record for this row using columns 1-4 and traceability.
            temp_rec = {
                "FCA": fca_num,
                "cid": cid_val,
                "CAS": cas_value,
                "authorized in": [table_desc],
                "ChineseName": chinese_name,
                "engine": "SFPPy: GBappendixA module",
                "csfile": os.path.basename(self.csv_file),
                "date": new_index["index_date"]
            }
            # The positive list details (columns 5-10) are stored in pos_info.
            # Merge records by FCA number.
            if fca_num in records_dict:
                record = records_dict[fca_num]
                # Update the "authorized in" list if the category is new.
                if table_desc not in record.get("authorized in", []):
                    record["authorized in"].append(table_desc)
                # Branch under the key corresponding to table_desc.
                if table_desc in record:
                    # If an entry already exists for this category, append the new pos_info.
                    existing = record[table_desc]
                    if isinstance(existing, list):
                        existing.append(pos_info)
                    else:
                        record[table_desc] = [existing, pos_info]
                else:
                    record[table_desc] = pos_info
            else:
                # For a new FCA, include the branch with key table_desc.
                temp_rec[table_desc] = pos_info
                records_dict[fca_num] = temp_rec

            # Update index for CAS and ChineseName.
            if cas_value:
                if isinstance(cas_value, list):
                    for cas in cas_value:
                        new_index["CAS"].setdefault(cas, []).append(fca_num)
                else:
                    new_index["CAS"].setdefault(cas_value, []).append(fca_num)
            new_index["FCA"].setdefault(fca_num, []).append(fca_num)
            new_index["ChineseName"].setdefault(chinese_name, []).append(fca_num)

    # Write individual record files and build the order list.
    order_list = sorted(records_dict.keys(), key=lambda x: int(x))
    for fca in order_list:
        record = records_dict[fca]
        record_filename = f"FCA{int(fca):04d}.json"
        json_filename = os.path.join(self.cache_dir, record_filename)
        with open(json_filename, "w", encoding="utf-8") as jf:
            json.dump(record, jf, ensure_ascii=False, indent=2)
        new_index["order"].append(fca)
        if record.get("cid") is not None:
            new_index["bycid"][str(record["cid"])] = fca

    with open(self.index_file, "w", encoding="utf-8") as f:
        json.dump(new_index, f, ensure_ascii=False, indent=2)
    with open(missing_file, "w", encoding="utf-8") as mf:
        json.dump(missing_pubchem, mf, ensure_ascii=False, indent=2)
    self.index = new_index
    self.order = new_index.get("order", [])
    self._records_cache = {}
class gbrecord (d, order=None, total=None)

dict() -> new empty dictionary dict(mapping) -> new dictionary initialized from a mapping object's (key, value) pairs dict(iterable) -> new dictionary initialized as if via: d = {} for k, v in iterable: d[k] = v dict(**kwargs) -> new dictionary initialized with the name=value pairs in the keyword argument list. For example: dict(one=1, two=2)

Expand source code
class gbrecord(dict):
    def __init__(self, d, order=None, total=None):
        if not isinstance(d, dict):
            raise TypeError(f"dict must be a dict not a {type(d).__name__}")
        super().__init__(d)
        self._order = d.get("FCA", order)
        self._total = total

    def __str__(self):
        cid = self.get("cid", None)
        order_str = f"{self._order}" if self._order is not None else "?"
        total_str = f"{self._total}" if self._total is not None else "?"
        return f"<{self.__class__.__name__} with cid:{cid} - FCA {order_str} of {total_str} (GB 9685-2016)>"

    def __repr__(self):
        lines = []
        order_str = f"{self._order}" if self._order is not None else "?"
        total_str = f"{self._total}" if self._total is not None else "?"
        header = f" ---- [ GB 9685-2016 record: {order_str} of {total_str} ] ----"
        lines.append(header)

        # Define base keys to display first.
        base_keys = {"FCA", "cid", "CAS", "authorized in", "ChineseName", "engine", "csfile", "date"}
        fields_order = ["FCA", "cid", "CAS", "authorized in", "ChineseName"]
        for key in fields_order:
            if key not in self:
                continue
            val = self[key]
            if val is None or (isinstance(val, str) and not val.strip()):
                continue
            wrapped_val = custom_wrap(str(val), width=60, indent=" " * 22)
            lines.append(f"{key:>20}: {wrapped_val}")

        # Display branching positive list details:
        branch_keys = [k for k in self.keys() if k not in base_keys]
        if branch_keys:
            lines.append(f"\n{'--- Positive Lists':>20}:")
            for branch in sorted(branch_keys):
                value = self[branch]
                lines.append(f"  {branch}:")
                # Normalize value to a list
                entries = value if isinstance(value, list) else [value]
                for idx, entry in enumerate(entries, start=1):
                    lines.append(f"    Entry {idx}:")
                    if isinstance(entry, dict):
                        for k, v in entry.items():
                            if v is None or (isinstance(v, str) and not v.strip()):
                                continue
                            wrapped_v = custom_wrap(str(v), width=60, indent=" " * 25)
                            lines.append(f"      {k:>15}: {wrapped_v}")
                    else:
                        wrapped_entry = custom_wrap(str(entry), width=60, indent=" " * 25)
                        lines.append(f"      {wrapped_entry}")

        # Append traceability information.
        for key in ["engine", "csfile", "date"]:
            if key in self and self[key]:
                wrapped_val = custom_wrap(str(self[key]), width=60, indent=" " * 22)
                lines.append(f"{key:>20}: {wrapped_val}")
        return "\n".join(lines)

Ancestors

  • builtins.dict

Subclasses

class gbrecord_ext (rec, db=None)

dict() -> new empty dictionary dict(mapping) -> new dictionary initialized from a mapping object's (key, value) pairs dict(iterable) -> new dictionary initialized as if via: d = {} for k, v in iterable: d[k] = v dict(**kwargs) -> new dictionary initialized with the name=value pairs in the keyword argument list. For example: dict(one=1, two=2)

Expand source code
class gbrecord_ext(gbrecord):
    def __init__(self, rec, db=None):
        if not isinstance(rec, gbrecord):
            raise TypeError(f"dict must be a gbrecord not a {type(rec).__name__}")
        super().__init__(rec, order=rec._order, total=rec._total)
        from patankar.loadpubchem import migrant
        if rec.get("CAS") and (isinstance(rec.get("CAS"), str) and rec.get("CAS").strip() or isinstance(rec.get("CAS"), list)):
            try:
                m = migrant(rec.get("CAS"), annex1=False)
                M = m.M
            except Exception:
                print(f"{rec.get('ChineseName')} could not be extended from its CAS {rec.get('CAS')}: not found")
                M = None
        else:
            M = None
        self.cid = self.get("cid")
        self.M = M
        # Additional extended attributes (e.g. group information) can be added here.
        # [XXX] Additional placeholders remain as needed.

Ancestors