CandidateSelector
The candidate selector executes the candidate selection step in the SilkMoth pipeline. After signature generation SilkMoth accesses the inverted index to get all sets which contain a token in the signature to form an initial set of candidates.
The size of the candidate set can be reduced further by applying the check or nearest neighbour filters in the refinement step of the SilkMoth pipeline.
Examples
>>> from silkmoth.candidate_selector import CandidateSelector
>>> from silkmoth.utils import similar, jaccard_similarity
>>> from silkmoth.inverted_index import InvertedIndex
>>> S1 = [{"Apple", "Pear", "Car"}, {"Apple", "Sun", "Cat"}]
>>> S2 = [{"Something", "Else"}]
>>> S3 = [{"Apple", "Berlin", "Sun"}, {"Apple"}]
>>> S = [S1, S2, S3]
>>> signature = ["Apple", "Berlin"]
>>> I = InvertedIndex(S)
>>> cand_selector = CandidateSelector(jaccard_similarity, similar, 0.7)
>>> cand_selector.get_candidates(signature, I, 2)
{0, 2}
Source code in src/silkmoth/candidate_selector.py
class CandidateSelector:
"""
The candidate selector executes the candidate selection step in the SilkMoth
pipeline. After signature generation SilkMoth accesses the inverted index to
get all sets which contain a token in the signature to form an initial set
of candidates.
The size of the candidate set can be reduced further by applying the check
or nearest neighbour filters in the refinement step of the SilkMoth pipeline.
Examples
--------
```
>>> from silkmoth.candidate_selector import CandidateSelector
>>> from silkmoth.utils import similar, jaccard_similarity
>>> from silkmoth.inverted_index import InvertedIndex
>>> S1 = [{"Apple", "Pear", "Car"}, {"Apple", "Sun", "Cat"}]
>>> S2 = [{"Something", "Else"}]
>>> S3 = [{"Apple", "Berlin", "Sun"}, {"Apple"}]
>>> S = [S1, S2, S3]
>>> signature = ["Apple", "Berlin"]
>>> I = InvertedIndex(S)
>>> cand_selector = CandidateSelector(jaccard_similarity, similar, 0.7)
>>> cand_selector.get_candidates(signature, I, 2)
{0, 2}
```
"""
def __init__(self, similarity_func, sim_metric, related_thresh, sim_thresh=0.0, q = 3):
"""
Initialize the candidate selector with some parameters.
Args:
similarity_func (callable): Similarity function phi(r, s) (e.g., Jaccard).
sim_metric (callable): Similarity metric related(R, S) (e.g., contain).
related_thresh (float): Relatedness threshold delta.
sim_thresh (float): Similarity threshold alpha.
q (int): q-chunk length for edit similarity.
"""
self.similarity = similarity_func
self.sim_metric = sim_metric
self.delta = related_thresh
self.alpha = sim_thresh
self.q = q
def get_candidates(self, signature, inverted_index, ref_size) -> set:
"""
Retrieve candidate set indices using token signature lookup.
Args:
signature (list): Signature tokens for a reference set.
inverted_index (InvertedIndex): Instance of the custom InvertedIndex class.
ref_size (int): Size of set R.
Returns:
set: Indices of candidate sets containing at least one signature token.
"""
candidates = set()
for token in signature:
try:
idx_list = inverted_index.get_indexes(token)
for set_idx, _ in idx_list:
src_size = len(inverted_index.get_set(set_idx))
if self.verify_size(ref_size, src_size):
candidates.add(set_idx)
except ValueError:
# token not found in inverted index; safely ignore
continue
return candidates
def verify_size(self, ref_size, src_size) -> bool:
"""
Checks if sets can be related based on their sizes. Set-Containment is
only defined for |R|<=|S|. For Set-Similarity we should compare only
similar size sets.
Args:
ref_size (int): Size of set R.
src_size (int): Size of (possible) set S.
Returns:
bool: True if both sets could be related based on their size, False otherwise.
"""
# case 1: Set-Containment
if self.sim_metric == contain and ref_size > src_size:
return False
# case 2: Set-Similarity
if self.sim_metric == similar:
if min(ref_size, src_size) < self.delta * max(ref_size, src_size):
return False
return True
def check_filter(self, R, K, candidates, inverted_index) -> tuple:
"""
Apply check filter to prune weak candidate sets.
Args:
R (list of list): Tokenized reference set.
K (set): Flattened signature tokens.
candidates (set): Candidate set indices from get_candidates().
inverted_index (InvertedIndex): For retrieving sets.
Returns:
tuple:
set: Candidate indices that pass the check filter.
dict: c_idx -> dict{r_idx -> max_sim}.
"""
filtered = set()
match_map = dict()
k_i_sets = [set(r_i).intersection(K) for r_i in R]
for c_idx in candidates:
matched = self.create_match_map(R, k_i_sets, c_idx, inverted_index)
if matched:
filtered.add(c_idx)
match_map[c_idx] = matched
return filtered, match_map
def create_match_map(self, R, k_i_sets, c_idx, inverted_index) -> dict:
"""
Create a match map for a specific candidate index.
Args:
R (list of list): Tokenized reference set.
k_i_sets (list of sets): Unflattened signature.
c_idx (int): Candidate set index.
inverted_index (InvertedIndex): For retrieving sets.
Returns:
dict: r_idx -> max_sim for matched reference sets.
"""
S = inverted_index.get_set(c_idx)
matched = {}
for r_idx, (r_i, k_i) in enumerate(zip(R, k_i_sets)):
if not r_i or not k_i:
continue
denominator = len(r_i)
threshold = (denominator - len(k_i)) / denominator if denominator != 0 else 0.0
is_edit = self.similarity in (edit_similarity, N_edit_similarity)
# for Jaccard set is needed, for edit list is needed
if not is_edit:
r_set = set(r_i)
max_sim = 0.0
for token in k_i:
try:
entries = inverted_index.get_indexes_binary(token, c_idx)
for s_idx, e_idx in entries:
if s_idx != c_idx:
continue
s = S[e_idx]
# call signature based on edit vs. jaccard
if is_edit:
sim = self.similarity(r_i, s, self.alpha)
else:
sim = self.similarity(r_set, set(s), self.alpha)
if sim >= threshold:
max_sim = max(max_sim, sim)
except ValueError:
continue
if max_sim >= threshold:
matched[r_idx] = max_sim
return matched
def _nn_search(self, r_elem, S, c_idx, inverted_index) -> float:
"""
Find the maximum similarity between r and elements s ∈ S[C] that share at least one token with r using
the inverted index for efficiency.
Args:
r_set (set): Reference element tokens.
S (list of list): Elements of candidate set S[c_idx].
c_idx (int): Index of candidate set in inverted index.
inverted_index (InvertedIndex): For fetching token locations.
Returns:
float: Maximum similarity between r and any s ∈ S[c_idx].
"""
# seen = set()
max_sim = 0.0
is_edit = self.similarity in (edit_similarity, N_edit_similarity)
for token in r_elem:
try:
entries = inverted_index.get_indexes_binary(token,c_idx)
for s_idx, e_idx in entries:
if s_idx != c_idx:
continue
s = S[e_idx]
if is_edit:
sim = self.similarity(r_elem, s, self.alpha)
else:
sim = self.similarity(set(r_elem), set(s), self.alpha)
max_sim = max(max_sim, sim)
except ValueError:
continue
return max_sim
def nn_filter(self, R, K, candidates, inverted_index, threshold, match_map) -> set:
"""
Nearest Neighbor Filter (Algorithm 2 from SilkMoth paper).
Args:
R (list of list): Tokenized reference set.
K (set): Flattened signature tokens.
candidates (set): Candidate indices from check filter.
inverted_index (InvertedIndex): To retrieve sets and indexes.
threshold (float): Relatedness threshold δ (between 0 and 1).
match_map (dict): Maps candidate set index to matched rᵢ indices and their max sim (from check filter).
Returns:
set: Final filtered candidate indices that pass the NN filter.
"""
n = len(R)
theta = threshold * n
is_edit = self.similarity in (edit_similarity, N_edit_similarity)
k_i_sets = [set(r_i).intersection(K) for r_i in R]
r_i_list = R
final_filtered = set()
total_init = 0
for r_idx, r_i in enumerate(R):
if not r_i:
continue
k_i = k_i_sets[r_idx]
base_loss = self.calc_base_loss(k_i, r_i)
total_init += base_loss
for c_idx in candidates:
S = inverted_index.get_set(c_idx)
if self.alpha > 0:
S_tokens = set()
for s in S:
S_tokens.update(s)
# Check if match_map is provided, otherwise create it
if match_map is None:
matched = self.create_match_map(R, K, c_idx, inverted_index)
else:
matched = match_map.get(c_idx, {})
# Step 1: initialize total estimate
total = total_init
# Step 2: for matched rᵢ, computational reuse of sim and adjust total
if matched:
for r_idx, sim in matched.items():
r_i = r_i_list[r_idx]
if not r_i:
continue
k_i = k_i_sets[r_idx]
base_loss = self.calc_base_loss(k_i, r_i)
total += sim - base_loss
# Step 3: for non-matched rᵢ, compute NN and adjust total
for r_idx in set(range(n)) - matched.keys():
r_i = r_i_list[r_idx]
if not r_i:
continue
k_i = k_i_sets[r_idx]
base_loss = self.calc_base_loss(k_i, r_i)
r_set = set(r_i)
# Case alpha > 0
if (self.alpha > 0 and len(k_i) >= floor((1 - self.alpha) * len(r_i)) + 1
and k_i.isdisjoint(S_tokens)):
nn_sim = 0
else:
if is_edit:
# brute‑force search over q‑gram lists for edit_similarity
r_list = r_i_list[r_idx]
nn_sim = 0.0
for s_list in S:
sim = self.similarity(r_list, s_list, self.alpha)
if sim > nn_sim:
nn_sim = sim
else:
# inverted‐index search for jaccard
nn_sim = self._nn_search(r_set, S, c_idx, inverted_index)
total += nn_sim - base_loss
if total < theta:
break
if total >= theta:
final_filtered.add(c_idx)
return final_filtered
def calc_base_loss(self, k_i, r_i):
if self.similarity in (edit_similarity, N_edit_similarity):
denominator = len(r_i) + len(k_i)
base_loss = len(r_i) / denominator if denominator != 0 else 0.0
else:
denominator = len(r_i)
base_loss = (len(r_i) - len(k_i)) / denominator if denominator != 0 else 0.0
return base_loss
__init__(similarity_func, sim_metric, related_thresh, sim_thresh=0.0, q=3)
Initialize the candidate selector with some parameters.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
similarity_func
|
callable
|
Similarity function phi(r, s) (e.g., Jaccard). |
required |
sim_metric
|
callable
|
Similarity metric related(R, S) (e.g., contain). |
required |
related_thresh
|
float
|
Relatedness threshold delta. |
required |
sim_thresh
|
float
|
Similarity threshold alpha. |
0.0
|
q
|
int
|
q-chunk length for edit similarity. |
3
|
Source code in src/silkmoth/candidate_selector.py
def __init__(self, similarity_func, sim_metric, related_thresh, sim_thresh=0.0, q = 3):
"""
Initialize the candidate selector with some parameters.
Args:
similarity_func (callable): Similarity function phi(r, s) (e.g., Jaccard).
sim_metric (callable): Similarity metric related(R, S) (e.g., contain).
related_thresh (float): Relatedness threshold delta.
sim_thresh (float): Similarity threshold alpha.
q (int): q-chunk length for edit similarity.
"""
self.similarity = similarity_func
self.sim_metric = sim_metric
self.delta = related_thresh
self.alpha = sim_thresh
self.q = q
check_filter(R, K, candidates, inverted_index)
Apply check filter to prune weak candidate sets.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
R
|
list of list
|
Tokenized reference set. |
required |
K
|
set
|
Flattened signature tokens. |
required |
candidates
|
set
|
Candidate set indices from get_candidates(). |
required |
inverted_index
|
InvertedIndex
|
For retrieving sets. |
required |
Returns:
| Name | Type | Description |
|---|---|---|
tuple |
tuple
|
set: Candidate indices that pass the check filter. dict: c_idx -> dict{r_idx -> max_sim}. |
Source code in src/silkmoth/candidate_selector.py
def check_filter(self, R, K, candidates, inverted_index) -> tuple:
"""
Apply check filter to prune weak candidate sets.
Args:
R (list of list): Tokenized reference set.
K (set): Flattened signature tokens.
candidates (set): Candidate set indices from get_candidates().
inverted_index (InvertedIndex): For retrieving sets.
Returns:
tuple:
set: Candidate indices that pass the check filter.
dict: c_idx -> dict{r_idx -> max_sim}.
"""
filtered = set()
match_map = dict()
k_i_sets = [set(r_i).intersection(K) for r_i in R]
for c_idx in candidates:
matched = self.create_match_map(R, k_i_sets, c_idx, inverted_index)
if matched:
filtered.add(c_idx)
match_map[c_idx] = matched
return filtered, match_map
create_match_map(R, k_i_sets, c_idx, inverted_index)
Create a match map for a specific candidate index.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
R
|
list of list
|
Tokenized reference set. |
required |
k_i_sets
|
list of sets
|
Unflattened signature. |
required |
c_idx
|
int
|
Candidate set index. |
required |
inverted_index
|
InvertedIndex
|
For retrieving sets. |
required |
Returns:
| Name | Type | Description |
|---|---|---|
dict |
dict
|
r_idx -> max_sim for matched reference sets. |
Source code in src/silkmoth/candidate_selector.py
def create_match_map(self, R, k_i_sets, c_idx, inverted_index) -> dict:
"""
Create a match map for a specific candidate index.
Args:
R (list of list): Tokenized reference set.
k_i_sets (list of sets): Unflattened signature.
c_idx (int): Candidate set index.
inverted_index (InvertedIndex): For retrieving sets.
Returns:
dict: r_idx -> max_sim for matched reference sets.
"""
S = inverted_index.get_set(c_idx)
matched = {}
for r_idx, (r_i, k_i) in enumerate(zip(R, k_i_sets)):
if not r_i or not k_i:
continue
denominator = len(r_i)
threshold = (denominator - len(k_i)) / denominator if denominator != 0 else 0.0
is_edit = self.similarity in (edit_similarity, N_edit_similarity)
# for Jaccard set is needed, for edit list is needed
if not is_edit:
r_set = set(r_i)
max_sim = 0.0
for token in k_i:
try:
entries = inverted_index.get_indexes_binary(token, c_idx)
for s_idx, e_idx in entries:
if s_idx != c_idx:
continue
s = S[e_idx]
# call signature based on edit vs. jaccard
if is_edit:
sim = self.similarity(r_i, s, self.alpha)
else:
sim = self.similarity(r_set, set(s), self.alpha)
if sim >= threshold:
max_sim = max(max_sim, sim)
except ValueError:
continue
if max_sim >= threshold:
matched[r_idx] = max_sim
return matched
get_candidates(signature, inverted_index, ref_size)
Retrieve candidate set indices using token signature lookup.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
signature
|
list
|
Signature tokens for a reference set. |
required |
inverted_index
|
InvertedIndex
|
Instance of the custom InvertedIndex class. |
required |
ref_size
|
int
|
Size of set R. |
required |
Returns:
| Name | Type | Description |
|---|---|---|
set |
set
|
Indices of candidate sets containing at least one signature token. |
Source code in src/silkmoth/candidate_selector.py
def get_candidates(self, signature, inverted_index, ref_size) -> set:
"""
Retrieve candidate set indices using token signature lookup.
Args:
signature (list): Signature tokens for a reference set.
inverted_index (InvertedIndex): Instance of the custom InvertedIndex class.
ref_size (int): Size of set R.
Returns:
set: Indices of candidate sets containing at least one signature token.
"""
candidates = set()
for token in signature:
try:
idx_list = inverted_index.get_indexes(token)
for set_idx, _ in idx_list:
src_size = len(inverted_index.get_set(set_idx))
if self.verify_size(ref_size, src_size):
candidates.add(set_idx)
except ValueError:
# token not found in inverted index; safely ignore
continue
return candidates
nn_filter(R, K, candidates, inverted_index, threshold, match_map)
Nearest Neighbor Filter (Algorithm 2 from SilkMoth paper).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
R
|
list of list
|
Tokenized reference set. |
required |
K
|
set
|
Flattened signature tokens. |
required |
candidates
|
set
|
Candidate indices from check filter. |
required |
inverted_index
|
InvertedIndex
|
To retrieve sets and indexes. |
required |
threshold
|
float
|
Relatedness threshold δ (between 0 and 1). |
required |
match_map
|
dict
|
Maps candidate set index to matched rᵢ indices and their max sim (from check filter). |
required |
Returns:
| Name | Type | Description |
|---|---|---|
set |
set
|
Final filtered candidate indices that pass the NN filter. |
Source code in src/silkmoth/candidate_selector.py
def nn_filter(self, R, K, candidates, inverted_index, threshold, match_map) -> set:
"""
Nearest Neighbor Filter (Algorithm 2 from SilkMoth paper).
Args:
R (list of list): Tokenized reference set.
K (set): Flattened signature tokens.
candidates (set): Candidate indices from check filter.
inverted_index (InvertedIndex): To retrieve sets and indexes.
threshold (float): Relatedness threshold δ (between 0 and 1).
match_map (dict): Maps candidate set index to matched rᵢ indices and their max sim (from check filter).
Returns:
set: Final filtered candidate indices that pass the NN filter.
"""
n = len(R)
theta = threshold * n
is_edit = self.similarity in (edit_similarity, N_edit_similarity)
k_i_sets = [set(r_i).intersection(K) for r_i in R]
r_i_list = R
final_filtered = set()
total_init = 0
for r_idx, r_i in enumerate(R):
if not r_i:
continue
k_i = k_i_sets[r_idx]
base_loss = self.calc_base_loss(k_i, r_i)
total_init += base_loss
for c_idx in candidates:
S = inverted_index.get_set(c_idx)
if self.alpha > 0:
S_tokens = set()
for s in S:
S_tokens.update(s)
# Check if match_map is provided, otherwise create it
if match_map is None:
matched = self.create_match_map(R, K, c_idx, inverted_index)
else:
matched = match_map.get(c_idx, {})
# Step 1: initialize total estimate
total = total_init
# Step 2: for matched rᵢ, computational reuse of sim and adjust total
if matched:
for r_idx, sim in matched.items():
r_i = r_i_list[r_idx]
if not r_i:
continue
k_i = k_i_sets[r_idx]
base_loss = self.calc_base_loss(k_i, r_i)
total += sim - base_loss
# Step 3: for non-matched rᵢ, compute NN and adjust total
for r_idx in set(range(n)) - matched.keys():
r_i = r_i_list[r_idx]
if not r_i:
continue
k_i = k_i_sets[r_idx]
base_loss = self.calc_base_loss(k_i, r_i)
r_set = set(r_i)
# Case alpha > 0
if (self.alpha > 0 and len(k_i) >= floor((1 - self.alpha) * len(r_i)) + 1
and k_i.isdisjoint(S_tokens)):
nn_sim = 0
else:
if is_edit:
# brute‑force search over q‑gram lists for edit_similarity
r_list = r_i_list[r_idx]
nn_sim = 0.0
for s_list in S:
sim = self.similarity(r_list, s_list, self.alpha)
if sim > nn_sim:
nn_sim = sim
else:
# inverted‐index search for jaccard
nn_sim = self._nn_search(r_set, S, c_idx, inverted_index)
total += nn_sim - base_loss
if total < theta:
break
if total >= theta:
final_filtered.add(c_idx)
return final_filtered
verify_size(ref_size, src_size)
Checks if sets can be related based on their sizes. Set-Containment is only defined for |R|<=|S|. For Set-Similarity we should compare only similar size sets.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
ref_size
|
int
|
Size of set R. |
required |
src_size
|
int
|
Size of (possible) set S. |
required |
Returns:
| Name | Type | Description |
|---|---|---|
bool |
bool
|
True if both sets could be related based on their size, False otherwise. |
Source code in src/silkmoth/candidate_selector.py
def verify_size(self, ref_size, src_size) -> bool:
"""
Checks if sets can be related based on their sizes. Set-Containment is
only defined for |R|<=|S|. For Set-Similarity we should compare only
similar size sets.
Args:
ref_size (int): Size of set R.
src_size (int): Size of (possible) set S.
Returns:
bool: True if both sets could be related based on their size, False otherwise.
"""
# case 1: Set-Containment
if self.sim_metric == contain and ref_size > src_size:
return False
# case 2: Set-Similarity
if self.sim_metric == similar:
if min(ref_size, src_size) < self.delta * max(ref_size, src_size):
return False
return True