SignatureGenerator

The signature generator executes the signature generation step in the SilkMoth pipeline. During signature generation SilkMoth constructs a signature for a reference set R by selecting the “smallest” set of tokens from R such that if another set S does not share any tokens with R's signature, R and S are not related.

SilkMoth supports three different signature schemes: Weighted Signature Scheme (Def. 5 in paper), Skyline Signature Scheme (Def. 9), and Dichotomy Signature Scheme (Def. 10).

Examples

>>> from silkmoth.inverted_index import InvertedIndex
>>> from silkmoth.signature_generator import SignatureGenerator
>>> S1 = [{"Apple", "Pear", "Car"}, {"Apple", "Sun", "Cat"}]
>>> S2 = [{"Apple", "Berlin", "Sun"}, {"Apple"}]
>>> S = [S1, S2]
>>> I = InvertedIndex(S)
>>> R = [{"Apple"}, {"Sun", "Berlin", "Paris"}]
>>> sig_gen = SignatureGenerator()
>>> sig_gen.get_signature(R, I, 0.3)
['Apple', 'Sun', 'Berlin']
>>> sig_gen.get_signature(R, I, 0.5)
['Apple', 'Berlin']
Source code in src/silkmoth/signature_generator.py
class SignatureGenerator:
    """
    The signature generator executes the signature generation step in the SilkMoth
    pipeline. During signature generation SilkMoth constructs a signature for a 
    reference set R by selecting the “smallest” set of tokens from R such that 
    if another set S does not share any tokens with R's signature, R and S are 
    not related.

    SilkMoth supports three different signature schemes: Weighted Signature Scheme
    (Def. 5 in [paper](https://doi.org/10.14778/3115404.3115413)), Skyline Signature
    Scheme (Def. 9), and Dichotomy Signature Scheme (Def. 10).

    Examples
    --------
    ```
    >>> from silkmoth.inverted_index import InvertedIndex
    >>> from silkmoth.signature_generator import SignatureGenerator
    >>> S1 = [{"Apple", "Pear", "Car"}, {"Apple", "Sun", "Cat"}]
    >>> S2 = [{"Apple", "Berlin", "Sun"}, {"Apple"}]
    >>> S = [S1, S2]
    >>> I = InvertedIndex(S)
    >>> R = [{"Apple"}, {"Sun", "Berlin", "Paris"}]
    >>> sig_gen = SignatureGenerator()
    >>> sig_gen.get_signature(R, I, 0.3)
    ['Apple', 'Sun', 'Berlin']
    >>> sig_gen.get_signature(R, I, 0.5)
    ['Apple', 'Berlin']
    ```
    """

    def __init__(self):
        """
        Initialize the signature generator with default parameters.
        """
        self.sim_fun = jaccard_similarity
        self.q = 3

    def get_signature(self, reference_set, inverted_index, delta, alpha=0, sig_type=SigType.WEIGHTED, sim_fun = jaccard_similarity, q=3) -> list:
        """
        Compute a signature for a reference set according to the chosen signature type.

        Args:
            reference_set (list): Tokenized reference set.
            inverted_index (InvertedIndex): Index to evaluate token cost.
            delta (float): Relatedness threshold factor.
            alpha (float): Similarity threshold factor.
            sig_type (SigType): Type of signature.
            sim_fun (callable): Similarity function (phi) to use.
            q (int): Length of each q-chunk for edit similarity.

        Returns:
            list: A list of str for selected tokens forming the signature.
        """
        if sim_fun in (edit_similarity, N_edit_similarity):
            self.sim_fun = sim_fun
            self.q = q

            # If q is too large, no valid weighted signature exists
            q_bound = delta / (1 - delta)

            if q >= q_bound:
                warnings.warn(
                    f"q={q} is too large for delta = {delta:.3f}; "
                    "no valid weighted signature exists -> falling back to brute-force."
                )
                # returning all non-overlapping q-chunks so nothing is pruned
                all_chunks = []
                for elem in reference_set:
                    joined = " ".join(elem)
                    all_chunks += [
                        joined[i:i+q]
                        for i in range(0, len(joined) - q + 1, q)
                    ]
                return all_chunks

        is_edit_sim = sim_fun in (edit_similarity, N_edit_similarity)
        match sig_type:
            case SigType.WEIGHTED:
                if is_edit_sim and alpha > 0:
                    return self._generate_simthresh_signature_edit_similarity(reference_set, inverted_index, delta, alpha)
                elif is_edit_sim and alpha == 0:
                    return self._generate_weighted_signature_edit_similarity(reference_set, inverted_index, delta)
                else:
                    return self._generate_weighted_signature(reference_set, inverted_index, delta)
            case SigType.SKYLINE:
                return self._generate_skyline_signature(reference_set, inverted_index, delta, alpha)
            case SigType.DICHOTOMY:
                return self._generate_dichotomy_signature(reference_set, inverted_index, delta, alpha)
            case _:
                raise ValueError(f"Unknown signature type") 


    def _generate_simthresh_signature_edit_similarity(self, reference_set, inverted_index, delta, alpha) -> list:
        """
        Builds a similarity-threshold signature for edit similarity as described in the SILKMOTH Paper in 
        Section 7.2.

        For each element r_i in the reference set, it ensures that the signature contains at least 
        m_i = floor((1 - alpha)/alpha * |q_chunks(r_i)|) + 1 q-chunks, so that any element 
        sharing fewer than m_i chunks cannot achieve Eds ≥ alpha.

        Args:
            reference_set (list): Tokenized reference set.
            inverted_index (InvertedIndex): Index to evaluate token cost.
            delta (float): Relatedness threshold factor.
            alpha (float): Similarity threshold factor.
        Returns:
            list: A list of str for selected q-chunks forming the signature.
        """

        weighted_sig_edit_sim = set(self._generate_weighted_signature_edit_similarity(reference_set, inverted_index, delta))
        simthresh_sig = set(weighted_sig_edit_sim)

        for elem_tokens in reference_set:
            # compute all q-chunks of each element
            joined = " ".join(elem_tokens)
            chunks = [joined[j:j+self.q] for j in range(0, len(joined) - self.q + 1, self.q)]
            r = set(chunks)
            m_i = floor((1 - alpha) / alpha * len(r)) + 1

            # determine how many base chunks from this element are already in weighted signature
            k_i = list(weighted_sig_edit_sim & r)
            if len(k_i) < m_i:
                # need cheapest additional chunks -> sort all chunks by cost = inverted_index size
                sorted_chunks = sorted(
                    r,
                    key=lambda t: len(inverted_index.get_indexes(t)) if t in inverted_index.lookup_table else float('inf')
                )
                # add cheapest chunks up to m_i
                for chunk in sorted_chunks:
                    if len(simthresh_sig & r) >= m_i:
                        break
                    simthresh_sig.add(chunk)

        return list(simthresh_sig)    


    def _generate_skyline_signature(self, reference_set, inverted_index: InvertedIndex, delta, alpha):
        if self.sim_fun == jaccard_similarity:
            weighted = set(self._generate_weighted_signature(reference_set, inverted_index, delta))
            unflattened = [weighted & set(r_i) for r_i in reference_set]
        elif self.sim_fun in (edit_similarity, N_edit_similarity):
            weighted = set(self._generate_weighted_signature_edit_similarity(reference_set, inverted_index, delta))
            unflattened = [weighted & set(" ".join(r_i)[i:i+self.q] for i in range(0, len(" ".join(r_i)) - self.q + 1, self.q)) for r_i in reference_set]
        else:
            raise ValueError(f"Unknown similarity function: {self.sim_fun}")

        skyline = set()
        for i, k in enumerate(unflattened):
            if self.sim_fun == jaccard_similarity:
                rhs = floor((1 - alpha) * len(reference_set[i])) + 1
            elif self.sim_fun in (edit_similarity, N_edit_similarity):
                chunks = get_q_chunks(reference_set[i], self.q)
                r = set(chunks)
                rhs = floor((1 - alpha) / alpha * len(r)) + 1
            else:
                raise ValueError(f"Unknown similarity function: {self.sim_fun}")

            if len(k) < rhs:
                skyline |= k
            else:
                # add tokens with minimum |I[t]|
                tokens = list(k)
                tokens.sort(key=lambda t: len(inverted_index.get_indexes(t)))
                skyline = skyline.union(tokens[:rhs])
        return list(skyline)

    def _generate_dichotomy_signature(self, reference_set, inverted_index: InvertedIndex, delta, alpha):
        """
        Generates a signature using the Dichotomy Scheme as described in the SILKMOTH paper.

        For each element r_i, it chooses between its weighted signature part (k_i) and
        all its tokens (r_i) based on whether k_i is a subset of an optimal
        sim-thresh signature (m_i).
        """
        # 1. First, generate the optimal weighted signature, K.
        if self.sim_fun == jaccard_similarity:
            weighted_signature_K = set(self._generate_weighted_signature(reference_set, inverted_index, delta))
        elif self.sim_fun in (edit_similarity, N_edit_similarity):
            weighted_signature_K = set(self._generate_weighted_signature_edit_similarity(reference_set, inverted_index, delta))
        else:
            raise ValueError(f"Unknown similarity function: {self.sim_fun}")

        final_dichotomy_sig = set()

        # 2. For each element r_i, decide whether to use its k_i or the full r_i.
        for r_i_list in reference_set:
            if self.sim_fun == jaccard_similarity:
                r_i = set(r_i_list)
            elif self.sim_fun in (edit_similarity, N_edit_similarity):
                chunks = get_q_chunks(r_i_list, self.q)
                r_i = set(chunks)
            else:
                raise ValueError(f"Unknown similarity function: {self.sim_fun}")

            if not r_i:
                continue

            # 3. Determine k_i: the part of the weighted signature in this element.
            k_i = weighted_signature_K.intersection(r_i)

            # 4. Determine m_i: the optimal sim-thresh signature for this element.

            # 4a. Calculate the required size for the sim-thresh signature.
            if self.sim_fun == jaccard_similarity:
                m_i_size = floor((1 - alpha) * len(r_i)) + 1
            elif self.sim_fun in (edit_similarity, N_edit_similarity):
                m_i_size = floor((1 - alpha) / alpha * len(r_i)) + 1
            else:
                raise ValueError(f"Unknown similarity function: {self.sim_fun}")

            # 4b. Get all tokens from the original element r_i and sort by cost.
            element_tokens = list(r_i)

            # Sort tokens by the length of their inverted index list (cost).
            # Handle cases where a token might not be in the index.
            def get_token_cost(token):
                try:
                    return len(inverted_index.get_indexes(token))
                except ValueError:
                    return float('inf')  # Assign a high cost if not found

            element_tokens.sort(key=get_token_cost)

            # 4c. The optimal m_i consists of the cheapest tokens.
            m_i = set(element_tokens[:m_i_size])

            # 5. The Decision: Apply the paper's condition.
            # Can we get away with the cheaper weighted signature part (k_i)?
            # Yes, if k_i is already a subset of the optimal sim-thresh signature (m_i).
            if k_i.issubset(m_i):
                # Decision: Use the cheaper weighted signature tokens.
                final_dichotomy_sig.update(k_i)
            else:
                # Decision: Fall back to the safe, more expensive option.
                final_dichotomy_sig.update(r_i)

        return list(final_dichotomy_sig)


    def _generate_weighted_signature(self, reference_set, inverted_index, delta):
        if delta <= 0.0:
            return []

        n = len(reference_set)
        theta = delta * n  # required covered fraction ,  delta * |R| in paper

        # 1) Build token: elements map and aggregate token values
        token_to_elems = defaultdict(list)
        token_value = {}

        for i, elem in enumerate(reference_set): 
            if not elem:
                warnings.warn(f"Element at index {i} is empty and will be skipped.")
                continue
            unique_tokens = set(elem) # remove duplicate tokens inside each element
            weight = 1.0 / len(unique_tokens)

            for t in unique_tokens: 
                token_to_elems[t].append(i) 
                token_value[t] = token_value.get(t, 0.0) + weight # value = sum of weights (for each token)

        # 2) Build min-heap of (cost/value, token)
        heap = []
        for t, val in token_value.items():
            if val <= 0:
                continue
            try:
                cost = len(inverted_index.get_indexes(t)) # look up each token in inverted index to count in how many sets it is = cost
            except ValueError:
                # Token not in index: assign infinite cost to deprioritize
                cost = float('inf')
            heapq.heappush(heap, (cost / val, t)) # goal small ratio: cost/value

        # 3) Selection with greedy algorithm
        selected_sig = set()
        r_sizes = [len(set(elem)) if elem else 0 for elem in reference_set]
        total_loss = float(n)
        current_k_counts = [0] * n

        # while heap and total_loss >= theta:
        while heap and total_loss >= theta:
            # 1.
            ratio, t = heapq.heappop(heap)  # pull best token with lowest cost/value from heap
            if t in selected_sig:
                continue
            if ratio == float('inf'):
                break

            # 2.
            selected_sig.add(t)

            # 3.
            for i in range(n):
                if r_sizes[i] == 0:
                    continue

                # Calculate |k_i|: number of tokens from reference_set[i] also in selected_sig
                current_k_counts[i] = len(set(reference_set[i]).intersection(selected_sig))

            # 4.
            total_loss = sum(
                (r_sizes[i] - current_k_counts[i]) / r_sizes[i]
                for i in range(n) if r_sizes[i] > 0
            )

        return list(selected_sig)

    # Following the same logic of _generate_weighted_signature
    def _generate_weighted_signature_edit_similarity(self, reference_set, inverted_index, delta):
        if delta <= 0.0:
            return []

        n = len(reference_set)
        theta = delta * n  

        token_to_elems = defaultdict(list)   # map q-chunk -> list of element indexes
        token_value = {}                     # map q-chunk -> accumulated value

        r_sizes = []        # number of q-chunks per element
        element_chunks = [] # list of q-chunks per element

        # Step 1: Build q-chunks and token values
        for i, elem_tokens in enumerate(reference_set):
            if not elem_tokens:
                # warnings.warn(f"Element at index {i} is empty and will be skipped.")
                r_sizes.append(0)
                element_chunks.append([])
                continue

            # Join the list of tokens into a string
            joined = " ".join(elem_tokens)

            # Extract non-overlapping q-chunks
            chunks = [joined[j:j+self.q] for j in range(0, len(joined) - self.q + 1, self.q)]
            chunk_set = set(chunks)
            element_chunks.append(chunks)

            num_chunks = len(chunk_set)
            r_sizes.append(num_chunks)

            if num_chunks == 0:
                continue

            weight = 1.0 / num_chunks
            for chunk in chunk_set:
                token_to_elems[chunk].append(i)
                token_value[chunk] = token_value.get(chunk, 0.0) + weight # value = sum of weights (for each chunk)

        # Step 2: Build heap (cost/value, token)
        heap = []
        for chunk, val in token_value.items():
            if val <= 0:
                continue
            try:
                cost = len(inverted_index.get_indexes(chunk))  # number of sets where chunk appears
            except ValueError:
                cost = float('inf')
            heapq.heappush(heap, (cost / val, chunk))

        # Step 3: Greedy selection
        selected_sig = set()
        current_k_counts = [0] * n
        total_loss = float(n)     

        while heap and total_loss >= theta:
            ratio, chunk = heapq.heappop(heap)
            if chunk in selected_sig:
                continue
            if ratio == float('inf'):
                break

            selected_sig.add(chunk)

            for i in range(n):
                if r_sizes[i] == 0:
                    continue
                if chunk in element_chunks[i]:
                    current_k_counts[i] += 1


            total_loss = sum(
               r_sizes[i] / (r_sizes[i] + current_k_counts[i])
               for i in range(n) if r_sizes[i] > 0
            )

        return list(selected_sig)

__init__()

Initialize the signature generator with default parameters.

Source code in src/silkmoth/signature_generator.py
def __init__(self):
    """
    Initialize the signature generator with default parameters.
    """
    self.sim_fun = jaccard_similarity
    self.q = 3

get_signature(reference_set, inverted_index, delta, alpha=0, sig_type=SigType.WEIGHTED, sim_fun=jaccard_similarity, q=3)

Compute a signature for a reference set according to the chosen signature type.

Parameters:

Name Type Description Default
reference_set list

Tokenized reference set.

required
inverted_index InvertedIndex

Index to evaluate token cost.

required
delta float

Relatedness threshold factor.

required
alpha float

Similarity threshold factor.

0
sig_type SigType

Type of signature.

WEIGHTED
sim_fun callable

Similarity function (phi) to use.

jaccard_similarity
q int

Length of each q-chunk for edit similarity.

3

Returns:

Name Type Description
list list

A list of str for selected tokens forming the signature.

Source code in src/silkmoth/signature_generator.py
def get_signature(self, reference_set, inverted_index, delta, alpha=0, sig_type=SigType.WEIGHTED, sim_fun = jaccard_similarity, q=3) -> list:
    """
    Compute a signature for a reference set according to the chosen signature type.

    Args:
        reference_set (list): Tokenized reference set.
        inverted_index (InvertedIndex): Index to evaluate token cost.
        delta (float): Relatedness threshold factor.
        alpha (float): Similarity threshold factor.
        sig_type (SigType): Type of signature.
        sim_fun (callable): Similarity function (phi) to use.
        q (int): Length of each q-chunk for edit similarity.

    Returns:
        list: A list of str for selected tokens forming the signature.
    """
    if sim_fun in (edit_similarity, N_edit_similarity):
        self.sim_fun = sim_fun
        self.q = q

        # If q is too large, no valid weighted signature exists
        q_bound = delta / (1 - delta)

        if q >= q_bound:
            warnings.warn(
                f"q={q} is too large for delta = {delta:.3f}; "
                "no valid weighted signature exists -> falling back to brute-force."
            )
            # returning all non-overlapping q-chunks so nothing is pruned
            all_chunks = []
            for elem in reference_set:
                joined = " ".join(elem)
                all_chunks += [
                    joined[i:i+q]
                    for i in range(0, len(joined) - q + 1, q)
                ]
            return all_chunks

    is_edit_sim = sim_fun in (edit_similarity, N_edit_similarity)
    match sig_type:
        case SigType.WEIGHTED:
            if is_edit_sim and alpha > 0:
                return self._generate_simthresh_signature_edit_similarity(reference_set, inverted_index, delta, alpha)
            elif is_edit_sim and alpha == 0:
                return self._generate_weighted_signature_edit_similarity(reference_set, inverted_index, delta)
            else:
                return self._generate_weighted_signature(reference_set, inverted_index, delta)
        case SigType.SKYLINE:
            return self._generate_skyline_signature(reference_set, inverted_index, delta, alpha)
        case SigType.DICHOTOMY:
            return self._generate_dichotomy_signature(reference_set, inverted_index, delta, alpha)
        case _:
            raise ValueError(f"Unknown signature type")