From 4ac4787691e163d55776163699c88c0a5e52dc8f Mon Sep 17 00:00:00 2001 From: Jay Baxter Date: Thu, 5 Mar 2026 13:02:30 -0800 Subject: [PATCH 1/2] Update scorer: gaussian topic scorer and gaussian core with topics scorer --- scoring/src/scoring/constants.py | 27 ++++ scoring/src/scoring/enums.py | 2 + scoring/src/scoring/gaussian_scorer.py | 66 +++++----- scoring/src/scoring/helpfulness_scores.py | 15 ++- scoring/src/scoring/mf_base_scorer.py | 2 + scoring/src/scoring/mf_topic_scorer.py | 12 +- scoring/src/scoring/run_scoring.py | 149 ++++++++++++++++++++-- scoring/src/scoring/scoring_rules.py | 2 + scoring/src/scoring/topic_model.py | 108 +++++++++++++--- 9 files changed, 310 insertions(+), 73 deletions(-) diff --git a/scoring/src/scoring/constants.py b/scoring/src/scoring/constants.py index c96f9db9..9b2ead0a 100644 --- a/scoring/src/scoring/constants.py +++ b/scoring/src/scoring/constants.py @@ -361,6 +361,19 @@ def rater_factor_key(i): gaussianNoteInterceptNoHighVolKey = "gaussianNoteInterceptNoHighVol" gaussianNoteInterceptNoCorrelatedKey = "gaussianNoteInterceptNoCorrelated" gaussianNoteInterceptPopulationSampledKey = "gaussianNoteInterceptPopulationSampled" +# Gaussian Core With Topics Model +gaussianCoreWithTopicsNoteInterceptKey = "gaussianCoreWithTopicsNoteIntercept" +gaussianCoreWithTopicsNoteFactor1Key = "gaussianCoreWithTopicsNoteFactor1" +gaussianCoreWithTopicsRatingStatusKey = "gaussianCoreWithTopicsRatingStatus" +gaussianCoreWithTopicsActiveRulesKey = "gaussianCoreWithTopicsActiveRules" +gaussianCoreWithTopicsNumFinalRoundRatingsKey = "gaussianCoreWithTopicsNumFinalRoundRatings" +gaussianCoreWithTopicsNoteInterceptNoHighVolKey = "gaussianCoreWithTopicsNoteInterceptNoHighVol" +gaussianCoreWithTopicsNoteInterceptNoCorrelatedKey = ( + "gaussianCoreWithTopicsNoteInterceptNoCorrelated" +) +gaussianCoreWithTopicsNoteInterceptPopulationSampledKey = ( + "gaussianCoreWithTopicsNoteInterceptPopulationSampled" +) # Harassment/Abuse Tag harassmentNoteInterceptKey = "harassmentNoteIntercept" harassmentNoteFactor1Key = "harassmentNoteFactor1" @@ -394,6 +407,9 @@ def rater_factor_key(i): aboveHelpfulnessThresholdKey = "aboveHelpfulnessThreshold" totalHelpfulHarassmentRatingsPenaltyKey = "totalHelpfulHarassmentPenalty" raterAgreeRatioWithHarassmentAbusePenaltyKey = "raterAgreeRatioKeyWithHarassmentAbusePenalty" +crhTotal14dKey = "crhTotal14d" +crnhTotal14dKey = "crnhTotal14d" +nmrTotal14dKey = "nmrTotal14d" # Note Status Labels currentlyRatedHelpful = "CURRENTLY_RATED_HELPFUL" @@ -960,6 +976,14 @@ def rater_factor_key(i): (gaussianNoteInterceptNoHighVolKey, np.double), (gaussianNoteInterceptPopulationSampledKey, np.double), (gaussianNumFinalRoundRatingsKey, np.double), # double because nullable. + (gaussianCoreWithTopicsNoteInterceptKey, np.double), + (gaussianCoreWithTopicsNoteFactor1Key, np.double), + (gaussianCoreWithTopicsRatingStatusKey, "category"), + (gaussianCoreWithTopicsActiveRulesKey, "category"), + (gaussianCoreWithTopicsNoteInterceptNoHighVolKey, np.double), + (gaussianCoreWithTopicsNoteInterceptNoCorrelatedKey, np.double), + (gaussianCoreWithTopicsNoteInterceptPopulationSampledKey, np.double), + (gaussianCoreWithTopicsNumFinalRoundRatingsKey, np.double), # double because nullable. ] noteModelOutputTSVColumns = [col for (col, dtype) in noteModelOutputTSVColumnsAndTypes] noteModelOutputTSVTypeMapping = {col: dtype for (col, dtype) in noteModelOutputTSVColumnsAndTypes} @@ -1049,6 +1073,9 @@ def rater_factor_key(i): (coreWithTopicsRaterFactor1Key, np.double), (coreFirstRoundRaterInterceptKey, np.double), (coreFirstRoundRaterFactor1Key, np.double), + (crhTotal14dKey, pd.Int64Dtype()), + (crnhTotal14dKey, pd.Int64Dtype()), + (nmrTotal14dKey, pd.Int64Dtype()), ] raterModelOutputTSVColumns = [col for (col, dtype) in raterModelOutputTSVColumnsAndTypes] raterModelOutputTSVTypeMapping = {col: dtype for (col, dtype) in raterModelOutputTSVColumnsAndTypes} diff --git a/scoring/src/scoring/enums.py b/scoring/src/scoring/enums.py index 93eaaaea..459a7db3 100644 --- a/scoring/src/scoring/enums.py +++ b/scoring/src/scoring/enums.py @@ -16,6 +16,7 @@ class Scorers(Enum): MFTopicScorer = auto() MFMultiGroupScorer = auto() GaussianScorer = auto() + GaussianCoreWithTopicsScorer = auto() class Topics(Enum): @@ -26,6 +27,7 @@ class Topics(Enum): GazaConflict = 2 MessiRonaldo = 3 Scams = 4 + InDimensionTwo = 5 def scorers_from_csv(csv: str) -> Set[Scorers]: diff --git a/scoring/src/scoring/gaussian_scorer.py b/scoring/src/scoring/gaussian_scorer.py index 9a32c485..f22c5980 100644 --- a/scoring/src/scoring/gaussian_scorer.py +++ b/scoring/src/scoring/gaussian_scorer.py @@ -250,6 +250,7 @@ def __init__( self._crhParams = crhParams self._crnhParams = crnhParams self._useMfNoteParams = useMfNoteParams + self._centeredBins = False def get_prescoring_name(self): return "MFCoreScorer" @@ -367,7 +368,7 @@ def _get_dropped_note_cols(self) -> List[str]: def _get_dropped_user_cols(self) -> List[str]: """Returns a list of columns which should be excluded from helpfulnessScores output.""" - return [] + return [c.internalRaterFactor1Key] def _prepare_data_for_scoring(self, ratings: pd.DataFrame, final: bool = False) -> pd.DataFrame: """Prepare data for scoring. This includes filtering out notes and raters which do not meet @@ -397,7 +398,7 @@ def _return_all_pts( params = self._crhParams if isCrh else self._crnhParams numQuantiles = len(quantileRange) - quantileCols = [f"{x:5.2f}" for x in quantileRange] + quantileCols = [f"{x:5.3f}" for x in quantileRange] quantileArray = np.array(quantileRange, dtype=np.float32) assert ( @@ -523,21 +524,20 @@ def _return_all_pts( quantileCols ].values - if not isCrh: - # Ensure notes with fewer than 3 ratings on each side get 0.1 smoothing - signCounts = ( - ratingsForTrainingWithFactors.assign( - neg=ratingsForTrainingWithFactors[c.internalRaterFactor1Key] < 0, - pos=ratingsForTrainingWithFactors[c.internalRaterFactor1Key] > 0, - ) - .groupby(c.noteIdKey)[["neg", "pos"]] - .sum() - .astype(int) + # Ensure notes with fewer than 3 ratings on each side get 0.1 smoothing + signCounts = ( + ratingsForTrainingWithFactors.assign( + neg=ratingsForTrainingWithFactors[c.internalRaterFactor1Key] < 0, + pos=ratingsForTrainingWithFactors[c.internalRaterFactor1Key] > 0, ) - insufficientMask = (signCounts["neg"] < 3) | (signCounts["pos"] < 3) - insufficientNoteIds = signCounts[insufficientMask].index - isInsufficient = np.isin(uniqueNotes, insufficientNoteIds) - smoothingValues[isInsufficient] = 0.1 + .groupby(c.noteIdKey)[["neg", "pos"]] + .sum() + .astype(int) + ) + insufficientMask = (signCounts["neg"] < 3) | (signCounts["pos"] < 3) + insufficientNoteIds = signCounts[insufficientMask].index + isInsufficient = np.isin(uniqueNotes, insufficientNoteIds) + smoothingValues[isInsufficient] = 0.1 # Smoothing weights if params.adaptiveWeightBase is not None: @@ -589,7 +589,7 @@ def _gaussian_kernel_extrapolator_vectorized( ratingsForTrainingWithFactors, quantileRange, isCrh=isCrh, empiricalPriors=empiricalPriors ) - quantileCols = [f"{x:5.2f}" for x in quantileRange] + quantileCols = [f"{x:5.3f}" for x in quantileRange] # Compute intercept logValues = np.log(clippedValues[quantileCols].values) @@ -765,31 +765,36 @@ def _score_notes_and_users( ].nunique() > self._nBinsEachSide ): - _, l_range = pd.qcut( + l_range = ( ratersWithParams.loc[ratersWithParams[c.internalRaterFactor1Key] < 0][ c.internalRaterFactor1Key - ], - self._nBinsEachSide, - retbins=True, + ] + .quantile(list(np.linspace(0.001, 0.999, self._nBinsEachSide))) + .values ) - _, r_range = pd.qcut( + r_range = ( ratersWithParams.loc[ratersWithParams[c.internalRaterFactor1Key] > 0][ c.internalRaterFactor1Key - ], - self._nBinsEachSide, - retbins=True, + ] + .quantile(list(np.linspace(0.001, 0.999, self._nBinsEachSide))) + .values ) lMids = (l_range[:-1] + l_range[1:]) / 2 rMids = (r_range[:-1] + r_range[1:]) / 2 - mids = (np.array(sorted(abs(lMids))) + np.array(sorted(abs(rMids)))) / 2 - crhQuantileRange = np.concatenate([sorted(-mids), mids]) - crnhQuantileRange = np.concatenate([sorted(-mids), mids]) + if self._centeredBins: + mids = (np.array(sorted(abs(lMids))) + np.array(sorted(abs(rMids)))) / 2 + crhQuantileRange = np.concatenate([sorted(-mids), mids]) + crnhQuantileRange = np.concatenate([sorted(-mids), mids]) + else: + crhQuantileRange = np.concatenate([lMids, rMids]) + crnhQuantileRange = np.concatenate([lMids, rMids]) logger.info(f"crh quantile range: {crhQuantileRange}") logger.info(f"crnh quantile range: {crnhQuantileRange}") # if there are not enough unique raters to even calculate bins, do not predict else: - scoredNotes = pd.DataFrame(columns=self.get_internal_scored_notes_cols()) - helpfulnessScores = pd.DataFrame(columns=self.get_internal_helpfulness_scores_cols()) + return pd.DataFrame(columns=self.get_internal_scored_notes_cols()), pd.DataFrame( + columns=self.get_internal_helpfulness_scores_cols() + ) else: crhQuantileRange = c.quantileRange @@ -957,6 +962,7 @@ def _score_notes_and_users( helpfulnessScores = prescoringRaterModelOutput[ [ c.raterParticipantIdKey, + c.internalRaterFactor1Key, ] ] diff --git a/scoring/src/scoring/helpfulness_scores.py b/scoring/src/scoring/helpfulness_scores.py index 2c2456fc..b2709f4d 100644 --- a/scoring/src/scoring/helpfulness_scores.py +++ b/scoring/src/scoring/helpfulness_scores.py @@ -29,14 +29,15 @@ def author_helpfulness( """ scoredNotes.loc[:, c.noteCountKey] = 1 - authorCounts = scoredNotes.groupby(c.noteAuthorParticipantIdKey).sum(numeric_only=True)[ - [ - c.currentlyRatedHelpfulBoolKey, - c.currentlyRatedNotHelpfulBoolKey, - c.noteCountKey, - noteInterceptKey, - ] + cols = [ + c.currentlyRatedHelpfulBoolKey, + c.currentlyRatedNotHelpfulBoolKey, + c.noteCountKey, + noteInterceptKey, ] + authorCounts = ( + scoredNotes[[c.noteAuthorParticipantIdKey] + cols].groupby(c.noteAuthorParticipantIdKey).sum() + ) authorCounts[c.crhRatioKey] = ( authorCounts[c.currentlyRatedHelpfulBoolKey] / authorCounts[c.noteCountKey] ) diff --git a/scoring/src/scoring/mf_base_scorer.py b/scoring/src/scoring/mf_base_scorer.py index 7be982c7..cdf1b0ec 100644 --- a/scoring/src/scoring/mf_base_scorer.py +++ b/scoring/src/scoring/mf_base_scorer.py @@ -197,6 +197,7 @@ def __init__( minMinorityNetHelpfulRatings: Optional[int] = None, minMinorityNetHelpfulRatio: Optional[float] = None, populationSampledRatingPerNoteLossRatio: Optional[float] = 10.0, + useGlobalIntercept: bool = True, ): """Configure MatrixFactorizationScorer object. @@ -297,6 +298,7 @@ def __init__( ("initLearningRate", 0.02 if normalizedLossHyperparameters is not None else 0.2), ("noInitLearningRate", 0.02 if normalizedLossHyperparameters is not None else 1.0), ("seed", seed) if seed is not None else None, + ("useGlobalIntercept", useGlobalIntercept), ] if pair is not None ] diff --git a/scoring/src/scoring/mf_topic_scorer.py b/scoring/src/scoring/mf_topic_scorer.py index 3e4bacc1..6f0a930c 100644 --- a/scoring/src/scoring/mf_topic_scorer.py +++ b/scoring/src/scoring/mf_topic_scorer.py @@ -65,6 +65,8 @@ def __init__( multiplyPenaltyByHarassmentScore: bool = True, minimumHarassmentScoreToPenalize: float = 2.0, tagConsensusHarassmentHelpfulRatingPenalty: int = 10, + numConfidenceRatings: int = 4, + useGlobalIntercept: bool = True, ) -> None: """Configure MFTopicScorer object. @@ -110,6 +112,7 @@ def __init__( minimumHarassmentScoreToPenalize=minimumHarassmentScoreToPenalize, tagConsensusHarassmentHelpfulRatingPenalty=tagConsensusHarassmentHelpfulRatingPenalty, useReputation=False, + useGlobalIntercept=useGlobalIntercept, ) self._topicName = topicName self._topicNoteInterceptKey = f"{c.topicNoteInterceptKey}_{self._topicName}" @@ -123,6 +126,7 @@ def __init__( self._topicNoteInterceptNoCorrelatedKey = ( f"{c.topicNoteInterceptNoCorrelatedKey}_{self._topicName}" ) + self._numConfidenceRatings = numConfidenceRatings def get_name(self): return f"MFTopicScorer_{self._topicName}" @@ -243,8 +247,12 @@ def _postprocess_output( .rename(columns={c.raterParticipantIdKey: "negRatingTotal"}) ) # Set scoring confidence bit - posFactorCounts = posFactorCounts[posFactorCounts["posRatingTotal"] > 4][[c.noteIdKey]] - negFactorCounts = negFactorCounts[negFactorCounts["negRatingTotal"] > 4][[c.noteIdKey]] + posFactorCounts = posFactorCounts[ + posFactorCounts["posRatingTotal"] > self._numConfidenceRatings + ][[c.noteIdKey]] + negFactorCounts = negFactorCounts[ + negFactorCounts["negRatingTotal"] > self._numConfidenceRatings + ][[c.noteIdKey]] confidentNotes = posFactorCounts.merge(negFactorCounts) confidentNotes[self._noteTopicConfidentKey] = True noteScores = noteScores.merge( diff --git a/scoring/src/scoring/run_scoring.py b/scoring/src/scoring/run_scoring.py index eb688cb6..d291366a 100644 --- a/scoring/src/scoring/run_scoring.py +++ b/scoring/src/scoring/run_scoring.py @@ -19,7 +19,9 @@ from . import constants as c, contributor_state, note_ratings, note_status_history, scoring_rules from .constants import FinalScoringArgs, ModelResult, PrescoringArgs, ScoringArgs from .enums import Scorers, Topics +from .gaussian_core_with_topics_scorer import GaussianCoreWithTopicsScorer from .gaussian_scorer import GaussianScorer, compute_empirical_prior_df +from .gaussian_topic_scorer import GaussianTopicScorer from .matrix_factorization.normalized_loss import NormalizedLossHyperparameters from .mf_core_scorer import MFCoreScorer from .mf_core_with_topics_scorer import MFCoreWithTopicsScorer @@ -77,6 +79,9 @@ def _get_scorers( scorers: Dict[Scorers, List[Scorer]] = dict() if final: scorers[Scorers.GaussianScorer] = [GaussianScorer(seed=seed, threads=12)] + scorers[Scorers.GaussianCoreWithTopicsScorer] = [ + GaussianCoreWithTopicsScorer(seed=seed, threads=12) + ] scorers[Scorers.MFCoreWithTopicsScorer] = [ MFCoreWithTopicsScorer( seed, pseudoraters, useStableInitialization=useStableInitialization, threads=12 @@ -147,9 +152,33 @@ def _get_scorers( seed=seed, ) ) - scorers[Scorers.MFTopicScorer] = [ - MFTopicScorer(topicName=topic.name, seed=seed) for topic in Topics - ] + topicScorers: List[Scorer] = [] + for topic in Topics: + if topic == Topics.InDimensionTwo: + if final: + topicScorers.append( + GaussianTopicScorer( + topicName=topic.name, + seed=seed, + useGlobalIntercept=False, + userInterceptLambda=5, + crhParams=c.GaussianParams(bandwidth=0.05), + numConfidenceRatings=0, + ) + ) + else: + topicScorers.append( + MFTopicScorer( + topicName=topic.name, + seed=seed, + useGlobalIntercept=False, + userInterceptLambda=5, + ) + ) + else: + topicScorers.append(MFTopicScorer(topicName=topic.name, seed=seed)) + scorers[Scorers.MFTopicScorer] = topicScorers + scorers[Scorers.MFMultiGroupScorer] = [ MFMultiGroupScorer(includedGroups={4, 5, 7, 12, 26}, groupId=1, threads=4, seed=seed), ] @@ -307,6 +336,14 @@ def _run_scorer_parallelizable( or from the dataLoader if scoringArgsSharedMemory is None. However, using the dataLoader to re-read the dataframes from disk is much slower than using shared memory and is deprecated. """ + # Ensure child processes have consistent log formatting. + if runParallel: + try: + from twitter.logging_config import configure_logging_for_child_process + + configure_logging_for_child_process() + except ImportError: + pass scorerStartTime = time.perf_counter() # Load data if multiprocessing @@ -846,22 +883,48 @@ def meta_score( crnhCoverage=True, ) ) + if enabledScorers is None or Scorers.GaussianCoreWithTopicsScorer in enabledScorers: + rules.append( + scoring_rules.ApplyCoverageModelResult( + RuleID.GAUSSIAN_CORE_WITH_TOPICS_MODEL, + {RuleID.EXPANSION_MODEL, RuleID.CORE_MODEL}, + c.gaussianCoreWithTopicsRatingStatusKey, + checkFirmReject=True, + crnhCoverage=True, + ) + ) if enabledScorers is None or Scorers.MFTopicScorer in enabledScorers: for topic in Topics: if topic == Topics.Unassigned: continue - rules.append( - scoring_rules.ApplyTopicModelResult( - RuleID[f"TOPIC_MODEL_{topic.value}"], - { - RuleID.EXPANSION_PLUS_MODEL, - RuleID.EXPANSION_MODEL, - RuleID.CORE_MODEL, - RuleID.GAUSSIAN_MODEL, - }, - topic, + elif topic == Topics.InDimensionTwo: + rules.append( + scoring_rules.ApplyTopicModelResult( + RuleID[f"TOPIC_MODEL_{topic.value}"], + { + RuleID.EXPANSION_PLUS_MODEL, + RuleID.EXPANSION_MODEL, + RuleID.CORE_MODEL, + RuleID.GAUSSIAN_MODEL, + }, + topic, + topicNMRInterceptThreshold=0.51, + topicNMRFactorThreshold=1.0, + ) + ) + else: + rules.append( + scoring_rules.ApplyTopicModelResult( + RuleID[f"TOPIC_MODEL_{topic.value}"], + { + RuleID.EXPANSION_PLUS_MODEL, + RuleID.EXPANSION_MODEL, + RuleID.CORE_MODEL, + RuleID.GAUSSIAN_MODEL, + }, + topic, + ) ) - ) rules.append( scoring_rules.PopulationSampledIntercept( @@ -995,6 +1058,46 @@ def _compute_note_stats( return scoredNotesCols, auxiliaryNoteInfoCols +def _compute_14d_stats( + ratings: pd.DataFrame, + noteStatusHistory: pd.DataFrame, +) -> pd.DataFrame: + """Helper function to compute 14d CRH, CRNH and NMR totals. + + Only notes written in the last 14 days count, and a note must either have status or + at least 10 ratings to count towards the totals. + """ + cutoff = noteStatusHistory[c.createdAtMillisKey].max() - (1000 * 60 * 60 * 24 * 14) + # Purge notes that are too old + recentStats = ( + noteStatusHistory[noteStatusHistory[c.createdAtMillisKey] > cutoff][ + [c.noteIdKey, c.noteAuthorParticipantIdKey, c.currentLabelKey] + ] + .rename(columns={c.noteAuthorParticipantIdKey: c.raterParticipantIdKey}) + .copy() + ) + # Purge notes that either lack status or too few ratings + ratingTotals = ratings[c.noteIdKey].value_counts().to_frame().reset_index(drop=False) + recentStats = recentStats.merge( + ratingTotals, how="inner", on=c.noteIdKey + ) # Implicitly drop notes with 0 ratings + recentStats = recentStats[ + (recentStats["count"] >= 10) + | (recentStats[c.currentLabelKey].isin({c.currentlyRatedHelpful, c.currentlyRatedNotHelpful})) + ].drop(columns=[c.noteIdKey, "count"]) + # Compute totals + recentStats[c.crhTotal14dKey] = recentStats[c.currentLabelKey] == c.currentlyRatedHelpful + recentStats[c.crnhTotal14dKey] = recentStats[c.currentLabelKey] == c.currentlyRatedNotHelpful + recentStats[c.nmrTotal14dKey] = recentStats[c.currentLabelKey] == c.needsMoreRatings + recentStats = ( + recentStats.drop(columns=[c.currentLabelKey]) + .groupby(c.raterParticipantIdKey) + .sum() + .reset_index(drop=False) + ) + return recentStats + + def _compute_helpfulness_scores( ratings: pd.DataFrame, scoredNotes: pd.DataFrame, @@ -1113,6 +1216,24 @@ def _compute_helpfulness_scores( # If field is not set by userEvent or by update script, ok to default to 1 helpfulnessScores[c.timestampOfLastEarnOut].fillna(1, inplace=True) + with c.time_block("Computing 14d contributor stats"): + recentStats = _compute_14d_stats(ratings, noteStatusHistory) + helpfulnessScores = helpfulnessScores.merge( + recentStats, + how="left", + on=c.raterParticipantIdKey, + unsafeAllowed={c.crhTotal14dKey, c.crnhTotal14dKey, c.nmrTotal14dKey}, + ) + helpfulnessScores = helpfulnessScores.fillna( + {c.crhTotal14dKey: 0.0, c.crnhTotal14dKey: 0.0, c.nmrTotal14dKey: 0.0} + ).astype( + { + c.crhTotal14dKey: pd.Int64Dtype(), + c.crnhTotal14dKey: pd.Int64Dtype(), + c.nmrTotal14dKey: pd.Int64Dtype(), + } + ) + return helpfulnessScores diff --git a/scoring/src/scoring/scoring_rules.py b/scoring/src/scoring/scoring_rules.py index 1482999c..59e15a74 100644 --- a/scoring/src/scoring/scoring_rules.py +++ b/scoring/src/scoring/scoring_rules.py @@ -71,11 +71,13 @@ class RuleID(Enum): TOPIC_MODEL_2 = RuleAndVersion("TopicModel02", "1.0", False) TOPIC_MODEL_3 = RuleAndVersion("TopicModel03", "1.0", False) TOPIC_MODEL_4 = RuleAndVersion("TopicModel04", "1.0", False) + TOPIC_MODEL_5 = RuleAndVersion("TopicModel05", "1.0", False) MULTI_GROUP_MODEL_1 = RuleAndVersion("MultiGroupModel01", "1.0", True) INSUFFICIENT_EXPLANATION = RuleAndVersion("InsufficientExplanation", "1.0", True) SCORING_DRIFT_GUARD = RuleAndVersion("ScoringDriftGuard", "1.0", False) NMR_DUE_TO_MIN_STABLE_CRH_TIME = RuleAndVersion("NmrDueToMinStableCrhTime", "1.0", False) GAUSSIAN_MODEL = RuleAndVersion("GaussianModel", "1.0", True) + GAUSSIAN_CORE_WITH_TOPICS_MODEL = RuleAndVersion("GaussianCoreWithTopicsModel", "1.0", True) def get_name(self) -> str: """Returns a string combining the name and version to uniquely name the logic of the ScoringRule.""" diff --git a/scoring/src/scoring/topic_model.py b/scoring/src/scoring/topic_model.py index 3b5d37b8..fa695baa 100644 --- a/scoring/src/scoring/topic_model.py +++ b/scoring/src/scoring/topic_model.py @@ -45,27 +45,37 @@ "palestin", # intentionally shortened for expanded matching "gaza", "jerusalem", - "\shamas\s", + r"\bhamas\b", }, Topics.MessiRonaldo: { - "messi\s", # intentional whitespace to prevent prefix matches + r"messi\b", # intentional whitespace to prevent prefix matches "ronaldo", }, Topics.Scams: { "scam", - "undisclosed\sad", # intentional whitespace - "terms\sof\sservice", # intentional whitespace - "help\.x\.com", - "x\.com/tos", - "engagement\sfarm", # intentional whitespace + r"undisclosed\sad", # intentional whitespace + r"terms\sof\sservice", # intentional whitespace + r"help\.x\.com", + r"x\.com/tos", + r"engagement\sfarm", # intentional whitespace "spam", "gambling", "apostas", "apuestas", "dropship", - "drop\sship", # intentional whitespace + r"drop\sship", # intentional whitespace "promotion", }, + Topics.InDimensionTwo: { + # this is an emergent second dimension from MF in IN + r"\bugc\b", + r"\bgc\b", + r"\bobc\b", + r"\bsc\b", + r"\bsc[,\s]+st\b", + r"\bst[,\s]+sc\b", + "आरक्षण", + }, } @@ -73,7 +83,7 @@ def get_seed_term_with_periods(): seedTermsWithPeriods = [] for terms in seedTerms.values(): for term in terms: - if "\." in term: + if r"\." in term: seedTermsWithPeriods.append(term) return seedTermsWithPeriods @@ -82,7 +92,8 @@ class TopicModel(object): def __init__(self, unassignedThreshold=0.99): """Initialize a list of seed terms for each topic.""" self._seedTerms = seedTerms - self._unassignedThreshold = unassignedThreshold + self._unassignedThreshold = {label: unassignedThreshold for label in range(1, len(Topics))} + self._unassignedThreshold[Topics.InDimensionTwo.value] = 0.7 self._compiled_regex = self._compile_regex() def _compile_regex(self): @@ -94,13 +105,16 @@ def _compile_regex(self): # If the pattern contains an escaped period (i.e. it's a URL), don't enforce the preceding whitespace or start-of-string. if "\\." in pattern: mod_patterns.append(pattern) + elif pattern.startswith(r"\b") or pattern.startswith(r"\s"): + # Pattern already has its own boundary — use as-is + mod_patterns.append(pattern) else: - mod_patterns.append(f"(\s|^){pattern}") + mod_patterns.append(rf"(?:\s|^){pattern}") group_name = f"{topic.name}" regex_patterns[group_name] = f"(?P<{group_name}>{'|'.join(mod_patterns)})" # Combine all groups into a single regex full_regex = "|".join(regex_patterns.values()) - return re.compile(full_regex) + return re.compile(full_regex, re.IGNORECASE) def _make_seed_labels(self, texts: np.ndarray) -> Tuple[np.ndarray, np.ndarray]: """Produce a label vector based on seed terms. @@ -177,8 +191,8 @@ def _get_stop_words(self, texts: np.ndarray) -> List[str]: # Identify stop words blockedTokens = set() for terms in self._seedTerms.values(): - # Remove whitespace and any escaped whitespace characters from seed terms - blockedTokens |= {re.sub(r"\\s", "", t.strip()) for t in terms} + # Remove whitespace, escaped whitespace characters, and word boundary markers from seed terms + blockedTokens |= {re.sub(r"\\[sb]", "", t.strip()) for t in terms} # Convert escaped periods to periods blockedTokens |= {re.sub(r"\\.", ".", t.strip()) for t in terms} logger.info(f" Total tokens to filter: {len(blockedTokens)}") @@ -198,10 +212,42 @@ def _merge_predictions_and_labels(self, probs: np.ndarray, labels: np.ndarray) - predictions = np.argmax(probs, axis=1) for label in range(1, len(Topics)): # Update label if (1) note was assigned based on the labeling heuristic, and (2) - # p(Unassigned) is below the required uncertainty threshold. - predictions[(labels == label) & (probs[:, 0] <= self._unassignedThreshold)] = label + # the sum of probabilities for all classes other than the seed label is below + # the required uncertainty threshold. + other_class_prob = 1.0 - probs[:, label] + predictions[ + (labels == label) & (other_class_prob <= self._unassignedThreshold[label]) + ] = label return predictions + @staticmethod + def _filter_url_tokens(text: str, min_token_length: int = 4) -> str: + """Replace URLs with only their constituent tokens that are at least min_token_length characters. + + This prevents short URL parameter fragments (e.g. 'sc' from 'sc_lang') from + creating false positive seed term matches after underscore replacement. + URLs matching seed term patterns (e.g. help.x.com, x.com/tos) are preserved as-is. + """ + + def replace_url(match): + # URLs that should be preserved verbatim because they are used as seed terms. + _PRESERVE_URL_PATTERNS = [ + re.compile(r"help\.x\.com"), + re.compile(r"x\.com/tos"), + ] + url = match.group(0) + # Preserve URLs that match seed term patterns + for pattern in _PRESERVE_URL_PATTERNS: + if pattern.search(url): + return url + # Split URL into word-like tokens (splitting on non-alphanumeric characters) + tokens = re.findall(r"[a-zA-Z]+", url) + # Keep only tokens that are at least min_token_length characters + filtered = [t for t in tokens if len(t) >= min_token_length] + return " ".join(filtered) + + return re.sub(r"https?://[^\s)\]]+", replace_url, text) + def _prepare_post_text(self, notes: pd.DataFrame) -> pd.DataFrame: """Concatenate all notes within each post into a single row associated with the post. @@ -218,11 +264,17 @@ def _prepare_post_text(self, notes: pd.DataFrame) -> pd.DataFrame: .apply(lambda postNotes: " ".join(postNotes)) .reset_index(drop=False) ) - # Default tokenization for CountVectorizer will not split on underscore, which - # results in very long tokens containing many words inside of URLs. Removing - # underscores allows us to keep default splitting while fixing that problem. + # Replace URLs with filtered tokens (only keeping words >= 4 chars) to prevent + # short URL fragments from matching seed terms after underscore replacement. + postNoteText[c.summaryKey] = [ + self._filter_url_tokens(text) for text in postNoteText[c.summaryKey].values + ] + # Default tokenization for CountVectorizer will not split on underscore or + # forward slash, which results in very long tokens containing many words + # inside of URLs. Removing underscores and slashes allows us to keep + # default splitting while fixing that problem. postNoteText[c.summaryKey] = [ - text.replace("_", " ") for text in postNoteText[c.summaryKey].values + text.replace("_", " ").replace("/", " ") for text in postNoteText[c.summaryKey].values ] return postNoteText @@ -306,6 +358,10 @@ def train_bootstrapped_note_topic_classifier( bootstrappedSeedTerms[Topics.Scams].remove( np.random.choice(list(seedTerms[Topics.Scams]), 1)[0] ) + bootstrappedSeedTerms[Topics.InDimensionTwo] = seedTerms[Topics.InDimensionTwo].copy() + bootstrappedSeedTerms[Topics.InDimensionTwo].remove( + np.random.choice(list(seedTerms[Topics.InDimensionTwo]), 1)[0] + ) self._seedTerms = bootstrappedSeedTerms ( pipe, @@ -382,6 +438,18 @@ def get_note_topics( else: probs = softmax(logits, axis=1) + # The classifier may have been trained on a non-contiguous subset of topic labels + # (e.g. [0, 1, 3, 4] when no training data exists for label 2). In that case, + # probs columns correspond to the classifier's classes_, not directly to topic + # indices. Expand probs so that column i = probability of topic i, ensuring + # np.argmax returns actual class labels rather than column indices. + classes = pipe.named_steps["Classifier"].classes_ + if len(classes) < len(Topics): + fullProbs = np.zeros((probs.shape[0], len(Topics))) + for j, cls in enumerate(classes): + fullProbs[:, cls] = probs[:, j] + probs = fullProbs + if seedLabelSets[i] is None: with c.time_block("Get Note Topics: Make Seed Labels"): seedLabelSets[i], _ = self._make_seed_labels(postText[c.summaryKey].values) From a83f68f507a5f27b705c22f2a90298af8443a7f2 Mon Sep 17 00:00:00 2001 From: Jay Baxter Date: Thu, 5 Mar 2026 13:02:55 -0800 Subject: [PATCH 2/2] Update scorer: gaussian topic scorer and gaussian core with topics scorer --- .../gaussian_core_with_topics_scorer.py | 83 ++++++ scoring/src/scoring/gaussian_topic_scorer.py | 236 ++++++++++++++++++ 2 files changed, 319 insertions(+) create mode 100644 scoring/src/scoring/gaussian_core_with_topics_scorer.py create mode 100644 scoring/src/scoring/gaussian_topic_scorer.py diff --git a/scoring/src/scoring/gaussian_core_with_topics_scorer.py b/scoring/src/scoring/gaussian_core_with_topics_scorer.py new file mode 100644 index 00000000..ec08bce0 --- /dev/null +++ b/scoring/src/scoring/gaussian_core_with_topics_scorer.py @@ -0,0 +1,83 @@ +from typing import Dict, List, Optional + +from . import constants as c +from .gaussian_scorer import GaussianScorer + + +class GaussianCoreWithTopicsScorer(GaussianScorer): + """Gaussian convolution scorer restricted to core groups (with topics variant). + + This scorer inherits all Gaussian scoring logic but filters ratings to only + include raters from coreGroups and unassigned raters, mirroring the population + used by MFCoreWithTopicsScorer. + """ + + def __init__( + self, + seed: Optional[int] = None, + threads: int = c.defaultNumThreads, + saveIntermediateState: bool = False, + ) -> None: + """Configure GaussianCoreWithTopicsScorer object. + + Args: + seed: if not None, seed value to ensure deterministic execution + threads: number of threads to use for intra-op parallelism in pytorch + saveIntermediateState: if True, save intermediate state for debugging + """ + super().__init__( + includedGroups=c.coverageGroups, + excludeTopics=False, + includeUnassigned=True, + captureThreshold=0.5, + seed=seed, + threads=threads, + saveIntermediateState=saveIntermediateState, + ) + + def get_name(self): + return "GaussianCoreWithTopicsScorer" + + def _get_note_col_mapping(self) -> Dict[str, str]: + """Returns a dict mapping default note column names to custom names for a specific model.""" + return { + c.internalNoteInterceptKey: c.gaussianCoreWithTopicsNoteInterceptKey, + c.internalNoteFactor1Key: c.gaussianCoreWithTopicsNoteFactor1Key, + c.internalActiveRulesKey: c.gaussianCoreWithTopicsActiveRulesKey, + c.numFinalRoundRatingsKey: c.gaussianCoreWithTopicsNumFinalRoundRatingsKey, + c.internalNoteInterceptNoHighVolKey: c.gaussianCoreWithTopicsNoteInterceptNoHighVolKey, + c.internalNoteInterceptNoCorrelatedKey: c.gaussianCoreWithTopicsNoteInterceptNoCorrelatedKey, + c.internalNoteInterceptPopulationSampledKey: c.gaussianCoreWithTopicsNoteInterceptPopulationSampledKey, + c.lowDiligenceNoteInterceptKey: c.lowDiligenceLegacyNoteInterceptKey, + c.internalRatingStatusKey: c.gaussianCoreWithTopicsRatingStatusKey, + } + + def _get_user_col_mapping(self) -> Dict[str, str]: + """Returns a dict mapping default user column names to custom names for a specific model.""" + return {} + + def get_scored_notes_cols(self) -> List[str]: + """Returns a list of columns which should be present in the scoredNotes output.""" + return [ + c.noteIdKey, + c.gaussianCoreWithTopicsNoteInterceptKey, + c.gaussianCoreWithTopicsNoteFactor1Key, + c.gaussianCoreWithTopicsRatingStatusKey, + c.gaussianCoreWithTopicsActiveRulesKey, + c.gaussianCoreWithTopicsNumFinalRoundRatingsKey, + c.gaussianCoreWithTopicsNoteInterceptNoHighVolKey, + c.gaussianCoreWithTopicsNoteInterceptNoCorrelatedKey, + c.gaussianCoreWithTopicsNoteInterceptPopulationSampledKey, + ] + + def get_helpfulness_scores_cols(self) -> List[str]: + """Returns a list of columns which should be present in the helpfulnessScores output.""" + return [ + c.raterParticipantIdKey, + ] + + def get_auxiliary_note_info_cols(self) -> List[str]: + """Returns a list of columns which should be present in the auxiliaryNoteInfo output.""" + return [ + c.noteIdKey, + ] diff --git a/scoring/src/scoring/gaussian_topic_scorer.py b/scoring/src/scoring/gaussian_topic_scorer.py new file mode 100644 index 00000000..11648255 --- /dev/null +++ b/scoring/src/scoring/gaussian_topic_scorer.py @@ -0,0 +1,236 @@ +from typing import Dict, List, Optional, Tuple + +from . import constants as c +from .gaussian_scorer import GaussianScorer +from .mf_topic_scorer import MFTopicScorer + +import pandas as pd + + +class GaussianTopicScorer(GaussianScorer): + def __init__( + self, + topicName: str, + seed: Optional[int] = None, + saveIntermediateState: bool = False, + minMeanNoteScore: float = 0.05, + crhThreshold: float = 0.40, + crnhThresholdIntercept: float = -0.05, + crnhThresholdNoteFactorMultiplier: float = -0.8, + crnhThresholdNMIntercept: float = -0.15, + crnhThresholdUCBIntercept: float = -0.5, + crhSuperThreshold: float = 0.5, + crhThresholdNoHighVol: float = 0.37, + crhThresholdNoCorrelated: float = 0.37, + lowDiligenceThreshold: float = 0.263, + factorThreshold: float = 0.5, + tagFilterPercentile: int = 95, + incorrectFilterThreshold: float = 2.5, + numConfidenceRatings: int = 4, + userFactorLambda=None, + noteFactorLambda=None, + userInterceptLambda=None, + noteInterceptLambda=None, + globalInterceptLambda=None, + diamondLambda=None, + normalizedLossHyperparameters=None, + useGlobalIntercept: bool = True, + crhParams: c.GaussianParams = c.gaussianCrhParams, + crnhParams: c.GaussianParams = c.gaussianCrnhParams, + ) -> None: + """Configure GaussianTopicScorer object. + + Notice that each GaussianTopicScorer defines column names by appending the topicName to + column prefixes which are constant. Dynamically defining the column names allows the + topic scorer to be instantiated multiple times while maintaining the property that + the columns attached by each scorer remain unique. Once all scorers have ran, we + validate that each note was scored by at most one topic scorer and then coalesce + all of the topic scoring columns and remove the topicName suffix. + + Args: + topicName: str indicating which topic this scorer instance should filter for. + seed: if not None, seed value to ensure deterministic execution + """ + super().__init__( + includedTopics={topicName}, + excludeTopics=False, + includedGroups=set(), + includeUnassigned=False, + captureThreshold=None, + seed=seed, + saveIntermediateState=saveIntermediateState, + threads=4, + minMeanNoteScore=minMeanNoteScore, + crhThreshold=crhThreshold, + crnhThresholdIntercept=crnhThresholdIntercept, + crnhThresholdNoteFactorMultiplier=crnhThresholdNoteFactorMultiplier, + crnhThresholdNMIntercept=crnhThresholdNMIntercept, + crnhThresholdUCBIntercept=crnhThresholdUCBIntercept, + crhSuperThreshold=crhSuperThreshold, + crhThresholdNoHighVol=crhThresholdNoHighVol, + crhThresholdNoCorrelated=crhThresholdNoCorrelated, + lowDiligenceThreshold=lowDiligenceThreshold, + factorThreshold=factorThreshold, + useReputation=False, + tagFilterPercentile=tagFilterPercentile, + incorrectFilterThreshold=incorrectFilterThreshold, + crhParams=crhParams, + crnhParams=crnhParams, + ) + # Store MF parameters for constructing the MFTopicScorer used in prescoring. + self._mfTopicScorerArgs = dict( + topicName=topicName, + seed=seed, + userFactorLambda=userFactorLambda, + noteFactorLambda=noteFactorLambda, + userInterceptLambda=userInterceptLambda, + noteInterceptLambda=noteInterceptLambda, + globalInterceptLambda=globalInterceptLambda, + diamondLambda=diamondLambda, + normalizedLossHyperparameters=normalizedLossHyperparameters, + useGlobalIntercept=useGlobalIntercept, + ) + self._topicName = topicName + self._topicNoteInterceptKey = f"{c.topicNoteInterceptKey}_{self._topicName}" + self._topicNoteFactor1Key = f"{c.topicNoteFactor1Key}_{self._topicName}" + self._topicRatingStatusKey = f"{c.topicRatingStatusKey}_{self._topicName}" + self._topicInternalActiveRulesKey = f"{c.topicInternalActiveRulesKey}_{self._topicName}" + self._topicNumFinalRoundRatingsKey = f"{c.topicNumFinalRoundRatingsKey}_{self._topicName}" + self._noteTopicKey = f"{c.noteTopicKey}_{self._topicName}" + self._noteTopicConfidentKey = f"{c.topicNoteConfidentKey}_{self._topicName}" + self._topicNoteInterceptNoHighVolKey = f"{c.topicNoteInterceptNoHighVolKey}_{self._topicName}" + self._topicNoteInterceptNoCorrelatedKey = ( + f"{c.topicNoteInterceptNoCorrelatedKey}_{self._topicName}" + ) + self._numConfidenceRatings = numConfidenceRatings + + def get_prescoring_name(self): + return f"MFTopicScorer_{self._topicName}" + + def get_name(self): + return f"GaussianTopicScorer_{self._topicName}" + + def _prescore_notes_and_users( + self, + ratings: pd.DataFrame, + noteStatusHistory: pd.DataFrame, + userEnrollmentRaw: pd.DataFrame, + ) -> Tuple[pd.DataFrame, pd.DataFrame, c.PrescoringMetaScorerOutput]: + mfScorer = MFTopicScorer(**self._mfTopicScorerArgs) + return mfScorer._prescore_notes_and_users(ratings, noteStatusHistory, userEnrollmentRaw) + + def _get_note_col_mapping(self) -> Dict[str, str]: + """Returns a dict mapping default note column names to custom names for a specific model.""" + return { + c.internalNoteInterceptKey: self._topicNoteInterceptKey, + c.internalNoteFactor1Key: self._topicNoteFactor1Key, + c.internalRatingStatusKey: self._topicRatingStatusKey, + c.internalActiveRulesKey: self._topicInternalActiveRulesKey, + c.numFinalRoundRatingsKey: self._topicNumFinalRoundRatingsKey, + c.lowDiligenceNoteInterceptKey: c.lowDiligenceLegacyNoteInterceptKey, + c.internalNoteInterceptNoHighVolKey: self._topicNoteInterceptNoHighVolKey, + c.internalNoteInterceptNoCorrelatedKey: self._topicNoteInterceptNoCorrelatedKey, + } + + def get_scored_notes_cols(self) -> List[str]: + """Returns a list of columns which should be present in the scoredNotes output.""" + return [ + c.noteIdKey, + self._topicNoteInterceptKey, + self._topicNoteFactor1Key, + self._topicRatingStatusKey, + self._noteTopicKey, + self._noteTopicConfidentKey, + self._topicInternalActiveRulesKey, + self._topicNumFinalRoundRatingsKey, + self._topicNoteInterceptNoHighVolKey, + self._topicNoteInterceptNoCorrelatedKey, + ] + + def get_helpfulness_scores_cols(self) -> List[str]: + """Returns a list of columns which should be present in the helpfulnessScores output.""" + return [] + + def get_auxiliary_note_info_cols(self) -> List[str]: + """Returns a list of columns which should be present in the auxiliaryNoteInfo output.""" + return [] + + def _get_dropped_note_cols(self) -> List[str]: + """Returns a list of columns which should be excluded from scoredNotes and auxiliaryNoteInfo.""" + return super()._get_dropped_note_cols() + + def _get_dropped_user_cols(self) -> List[str]: + """Returns a list of columns which should be excluded from helpfulnessScores output. + + Note: GaussianScorer's helpfulnessScores only contains raterParticipantIdKey and + internalRaterFactor1Key. The parent GaussianScorer._get_dropped_user_cols() already + drops internalRaterFactor1Key, so we only need to additionally drop raterParticipantIdKey. + """ + return super()._get_dropped_user_cols() + [ + c.raterParticipantIdKey, + ] + + def _postprocess_output( + self, + noteScores: pd.DataFrame, + userScores: pd.DataFrame, + ratings: pd.DataFrame, + noteStatusHistory: pd.DataFrame, + userEnrollment: pd.DataFrame, + ) -> Tuple[pd.DataFrame, pd.DataFrame]: + """Add noteTopicKey to notes output. + + Args: + noteScores: note outputs from scoring + userScores: user outputs from scoring + ratings (pd.DataFrame): preprocessed ratings + noteStatusHistory (pd.DataFrame): one row per note; history of when note had each status + userEnrollment (pd.DataFrame): one row per user specifying enrollment properties + + Returns: + Tuple[pd.DataFrame, pd.DataFrame]: + noteScores: filtered and updated note scoring output + userScores: filtered and updated user scoring output + """ + # Set the noteTopicKey column in each output + noteScores[self._noteTopicKey] = self._topicName + # Calculate total counts of positive and negative factor ratings + scoredNotes = noteScores[~noteScores[c.internalNoteInterceptKey].isna()][[c.noteIdKey]] + posFactorRaters = userScores[userScores[c.internalRaterFactor1Key] >= 0][ + [c.raterParticipantIdKey] + ] + posFactorRatings = ( + ratings[[c.noteIdKey, c.raterParticipantIdKey]].merge(scoredNotes).merge(posFactorRaters) + ) + posFactorCounts = ( + posFactorRatings.groupby(c.noteIdKey) + .count() + .reset_index(drop=False) + .rename(columns={c.raterParticipantIdKey: "posRatingTotal"}) + ) + negFactorRaters = userScores[userScores[c.internalRaterFactor1Key] < 0][ + [c.raterParticipantIdKey] + ] + negFactorRatings = ( + ratings[[c.noteIdKey, c.raterParticipantIdKey]].merge(scoredNotes).merge(negFactorRaters) + ) + negFactorCounts = ( + negFactorRatings.groupby(c.noteIdKey) + .count() + .reset_index(drop=False) + .rename(columns={c.raterParticipantIdKey: "negRatingTotal"}) + ) + # Set scoring confidence bit + posFactorCounts = posFactorCounts[ + posFactorCounts["posRatingTotal"] > self._numConfidenceRatings + ][[c.noteIdKey]] + negFactorCounts = negFactorCounts[ + negFactorCounts["negRatingTotal"] > self._numConfidenceRatings + ][[c.noteIdKey]] + confidentNotes = posFactorCounts.merge(negFactorCounts) + confidentNotes[self._noteTopicConfidentKey] = True + noteScores = noteScores.merge( + confidentNotes, how="left", unsafeAllowed=[self._noteTopicConfidentKey, c.defaultIndexKey] + ) + noteScores = noteScores.fillna({self._noteTopicConfidentKey: False}) + return noteScores, userScores