Inst2Vec/data_collator_for_language_model.py

435 lines
20 KiB
Python

from dataclasses import dataclass
from typing import (
TYPE_CHECKING,
Any,
Dict,
List,
NamedTuple,
Optional,
Sequence,
Tuple,
Union,
)
import numpy as np
import tokenizers
import torch
from transformers import BatchEncoding
EncodedInput = List[int]
@dataclass
class MyDataCollatorForPreTraining:
# """
# Data collator used for language modeling. Inputs are dynamically padded to the maximum length of a batch if they
# are not all of the same length.
# Args:
# # tokenizer (:class:`~transformers.PreTrainedTokenizer` or :class:`~transformers.PreTrainedTokenizerFast`):
# tokenizer (:class:`tokenizers.Tokenizer`)
# The tokenizer used for encoding the data.
# mlm (:obj:`bool`, `optional`, defaults to :obj:`True`):
# Whether or not to use masked language modeling. If set to :obj:`False`, the labels are the same as the
# inputs with the padding tokens ignored (by setting them to -100). Otherwise, the labels are -100 for
# non-masked tokens and the value to predict for the masked token.
# mlm_probability (:obj:`float`, `optional`, defaults to 0.15):
# The probability with which to (randomly) mask tokens in the input, when :obj:`mlm` is set to :obj:`True`.
# pad_to_multiple_of (:obj:`int`, `optional`):
# If set will pad the sequence to a multiple of the provided value.
# .. note::
# For best performance, this data collator should be used with a dataset having items that are dictionaries or
# BatchEncoding, with the :obj:`"special_tokens_mask"` key, as returned by a
# :class:`~transformers.PreTrainedTokenizer` or a :class:`~transformers.PreTrainedTokenizerFast` with the
# argument :obj:`return_special_tokens_mask=True`.
# """
# def __init__(
# self,
# tokenizer: tokenizers.Tokenizer,
# mlm: bool = True,
# mlm_probability: float = 0.15,
# pad_to_multiple_of: Optional[int] = None,
# ):
# self.tokenizer = tokenizer
# self.mlm = mlm
# self.mlm_probability = mlm_probability
# self.pad_to_multiple_of = pad_to_multiple_of
tokenizer: tokenizers.Tokenizer
mlm: bool = True
mlm_probability: float = 0.15
pad_to_multiple_of: Optional[int] = None
def __post_init__(self):
if self.mlm and self.tokenizer.token_to_id("[MASK]") is None:
raise ValueError(
"This tokenizer does not have a mask token which is necessary for masked language modeling. "
"You should pass `mlm=False` to train on causal language modeling instead."
)
def __call__(
self, examples: List[Union[List[int], torch.Tensor, Dict[str, torch.Tensor]]],
) -> Dict[str, torch.Tensor]:
# Handle dict or lists with proper padding and conversion to tensor.
if isinstance(examples[0], (dict, BatchEncoding)):
batch = pad(
examples,
return_tensors="pt",
pad_to_multiple_of=self.pad_to_multiple_of,
)
else:
batch = {
"input_ids": _collate_batch(
examples, self.tokenizer, pad_to_multiple_of=self.pad_to_multiple_of
)
}
# If special token mask has been preprocessed, pop it from the dict.
special_tokens_mask = batch.pop("special_tokens_mask", None)
if self.mlm:
batch["input_ids"], batch["labels"] = self.mask_tokens(
batch["input_ids"], special_tokens_mask=special_tokens_mask
)
# else:
# labels = batch["input_ids"].clone()
# if self.tokenizer.pad_token_id is not None:
# labels[labels == self.tokenizer.pad_token_id] = -100
# batch["labels"] = labels
return batch
def mask_tokens(
self, inputs: torch.Tensor, special_tokens_mask: Optional[torch.Tensor] = None
) -> Tuple[torch.Tensor, torch.Tensor]:
"""
Prepare masked tokens inputs/labels for masked language modeling: 80% MASK, 10% random, 10% original.
"""
labels = inputs.clone()
# We sample a few tokens in each sequence for MLM training (with probability `self.mlm_probability`)
probability_matrix = torch.full(labels.shape, self.mlm_probability)
if special_tokens_mask is None:
special_tokens_mask = [
self.tokenizer.get_special_tokens_mask(
val, already_has_special_tokens=True
)
for val in labels.tolist()
]
special_tokens_mask = torch.tensor(special_tokens_mask, dtype=torch.bool)
else:
special_tokens_mask = special_tokens_mask.bool()
probability_matrix.masked_fill_(special_tokens_mask, value=0.0)
masked_indices = torch.bernoulli(probability_matrix).bool()
labels[~masked_indices] = -100 # We only compute loss on masked tokens
# 80% of the time, we replace masked input tokens with tokenizer.mask_token ([MASK])
indices_replaced = (
torch.bernoulli(torch.full(labels.shape, 0.8)).bool() & masked_indices
)
# inputs[indices_replaced] = self.tokenizer.convert_tokens_to_ids(
# self.tokenizer.mask_token
# )
inputs[indices_replaced] = self.tokenizer.token_to_id("[MASK]")
# 10% of the time, we replace masked input tokens with random word
indices_random = (
torch.bernoulli(torch.full(labels.shape, 0.5)).bool()
& masked_indices
& ~indices_replaced
)
random_words = torch.randint(
self.tokenizer.get_vocab_size(), labels.shape, dtype=torch.long
)
inputs[indices_random] = random_words[indices_random]
# The rest of the time (10% of the time) we keep the masked input tokens unchanged
return inputs, labels
def pad(
self,
encoded_inputs: Union[
BatchEncoding,
List[BatchEncoding],
Dict[str, EncodedInput],
Dict[str, List[EncodedInput]],
List[Dict[str, EncodedInput]],
],
padding=True,
max_length: Optional[int] = None,
pad_to_multiple_of: Optional[int] = None,
return_attention_mask: Optional[bool] = None,
return_tensors=None,
verbose: bool = True,
) -> BatchEncoding:
"""
Pad a single encoded input or a batch of encoded inputs up to predefined length or to the max sequence length
in the batch.
Padding side (left/right) padding token ids are defined at the tokenizer level (with ``self.padding_side``,
``self.pad_token_id`` and ``self.pad_token_type_id``)
.. note::
If the ``encoded_inputs`` passed are dictionary of numpy arrays, PyTorch tensors or TensorFlow tensors, the
result will use the same type unless you provide a different tensor type with ``return_tensors``. In the
case of PyTorch tensors, you will lose the specific device of your tensors however.
Args:
encoded_inputs (:class:`~transformers.BatchEncoding`, list of :class:`~transformers.BatchEncoding`, :obj:`Dict[str, List[int]]`, :obj:`Dict[str, List[List[int]]` or :obj:`List[Dict[str, List[int]]]`):
Tokenized inputs. Can represent one input (:class:`~transformers.BatchEncoding` or :obj:`Dict[str,
List[int]]`) or a batch of tokenized inputs (list of :class:`~transformers.BatchEncoding`, `Dict[str,
List[List[int]]]` or `List[Dict[str, List[int]]]`) so you can use this method during preprocessing as
well as in a PyTorch Dataloader collate function.
Instead of :obj:`List[int]` you can have tensors (numpy arrays, PyTorch tensors or TensorFlow tensors),
see the note above for the return type.
padding (:obj:`bool`, :obj:`str` or :class:`~transformers.file_utils.PaddingStrategy`, `optional`, defaults to :obj:`True`):
Select a strategy to pad the returned sequences (according to the model's padding side and padding
index) among:
* :obj:`True` or :obj:`'longest'`: Pad to the longest sequence in the batch (or no padding if only a
single sequence if provided).
* :obj:`'max_length'`: Pad to a maximum length specified with the argument :obj:`max_length` or to the
maximum acceptable input length for the model if that argument is not provided.
* :obj:`False` or :obj:`'do_not_pad'` (default): No padding (i.e., can output a batch with sequences of
different lengths).
max_length (:obj:`int`, `optional`):
Maximum length of the returned list and optionally padding length (see above).
pad_to_multiple_of (:obj:`int`, `optional`):
If set will pad the sequence to a multiple of the provided value.
This is especially useful to enable the use of Tensor Cores on NVIDIA hardware with compute capability
>= 7.5 (Volta).
return_attention_mask (:obj:`bool`, `optional`):
Whether to return the attention mask. If left to the default, will return the attention mask according
to the specific tokenizer's default, defined by the :obj:`return_outputs` attribute.
`What are attention masks? <../glossary.html#attention-mask>`__
return_tensors (:obj:`str` or :class:`~transformers.file_utils.TensorType`, `optional`):
If set, will return tensors instead of list of python integers. Acceptable values are:
* :obj:`'tf'`: Return TensorFlow :obj:`tf.constant` objects.
* :obj:`'pt'`: Return PyTorch :obj:`torch.Tensor` objects.
* :obj:`'np'`: Return Numpy :obj:`np.ndarray` objects.
verbose (:obj:`bool`, `optional`, defaults to :obj:`True`):
Whether or not to print more information and warnings.
"""
# If we have a list of dicts, let's convert it in a dict of lists
# We do this to allow using this method as a collate_fn function in PyTorch Dataloader
if isinstance(encoded_inputs, (list, tuple)) and isinstance(
encoded_inputs[0], (dict, BatchEncoding)
):
encoded_inputs = {
key: [example[key] for example in encoded_inputs]
for key in encoded_inputs[0].keys()
}
# The model's main input name, usually `input_ids`, has be passed for padding
# if self.model_input_names[0] not in encoded_inputs:
# raise ValueError(
# "You should supply an encoding or a list of encodings to this method "
# f"that includes {self.model_input_names[0]}, but you provided {list(encoded_inputs.keys())}"
# )
required_input = encoded_inputs["input_ids"]
if not required_input:
if return_attention_mask:
encoded_inputs["attention_mask"] = []
return encoded_inputs
# If we have PyTorch/TF/NumPy tensors/arrays as inputs, we cast them as python objects
# and rebuild them afterwards if no return_tensors is specified
# Note that we lose the specific device the tensor may be on for PyTorch
first_element = required_input[0]
if isinstance(first_element, (list, tuple)):
# first_element might be an empty list/tuple in some edge cases so we grab the first non empty element.
index = 0
while len(required_input[index]) == 0:
index += 1
if index < len(required_input):
first_element = required_input[index][0]
# At this state, if `first_element` is still a list/tuple, it's an empty one so there is nothing to do.
if not isinstance(first_element, (int, list, tuple)):
if isinstance(first_element, torch.Tensor):
return_tensors = "pt" if return_tensors is None else return_tensors
elif isinstance(first_element, np.ndarray):
return_tensors = "np" if return_tensors is None else return_tensors
else:
raise ValueError(
f"type of {first_element} unknown: {type(first_element)}. "
f"Should be one of a python, numpy, pytorch or tensorflow object."
)
for key, value in encoded_inputs.items():
encoded_inputs[key] = to_py_obj(value)
# # Convert padding_strategy in PaddingStrategy
# padding_strategy, _, max_length, _ = self._get_padding_truncation_strategies(
# padding=padding, max_length=max_length, verbose=verbose
# )
required_input = encoded_inputs["input_ids"]
if required_input and not isinstance(required_input[0], (list, tuple)):
# encoded_inputs = _pad(
# encoded_inputs,
# max_length=max_length,
# # padding_strategy=padding_strategy,
# pad_to_multiple_of=pad_to_multiple_of,
# return_attention_mask=return_attention_mask,
# )
return BatchEncoding(encoded_inputs, tensor_type=return_tensors)
batch_size = len(required_input)
assert all(
len(v) == batch_size for v in encoded_inputs.values()
), "Some items in the output dictionary have a different batch size than others."
# if padding_strategy == PaddingStrategy.LONGEST:
# max_length = max(len(inputs) for inputs in required_input)
# padding_strategy = PaddingStrategy.MAX_LENGTH
batch_outputs = {}
for i in range(batch_size):
inputs = dict((k, v[i]) for k, v in encoded_inputs.items())
# outputs = self._pad(
# inputs,
# max_length=max_length,
# # padding_strategy=padding_strategy,
# pad_to_multiple_of=pad_to_multiple_of,
# return_attention_mask=return_attention_mask,
# )
for key, value in inputs.items():
if key not in batch_outputs:
batch_outputs[key] = []
batch_outputs[key].append(value)
return BatchEncoding(batch_outputs, tensor_type=return_tensors)
# def _pad(
# self,
# encoded_inputs: Union[Dict[str, EncodedInput], BatchEncoding],
# max_length: Optional[int] = None,
# padding_strategy: PaddingStrategy = PaddingStrategy.DO_NOT_PAD,
# pad_to_multiple_of: Optional[int] = None,
# return_attention_mask: Optional[bool] = None,
# ) -> dict:
# """
# Pad encoded inputs (on left/right and up to predefined length or max length in the batch)
# Args:
# encoded_inputs: Dictionary of tokenized inputs (`List[int]`) or batch of tokenized inputs (`List[List[int]]`).
# max_length: maximum length of the returned list and optionally padding length (see below).
# Will truncate by taking into account the special tokens.
# padding_strategy: PaddingStrategy to use for padding.
# - PaddingStrategy.LONGEST Pad to the longest sequence in the batch
# - PaddingStrategy.MAX_LENGTH: Pad to the max length (default)
# - PaddingStrategy.DO_NOT_PAD: Do not pad
# The tokenizer padding sides are defined in self.padding_side:
# - 'left': pads on the left of the sequences
# - 'right': pads on the right of the sequences
# pad_to_multiple_of: (optional) Integer if set will pad the sequence to a multiple of the provided value.
# This is especially useful to enable the use of Tensor Core on NVIDIA hardware with compute capability
# >= 7.5 (Volta).
# return_attention_mask: (optional) Set to False to avoid returning attention mask (default: set to model specifics)
# """
# # Load from model defaults
# if return_attention_mask is None:
# return_attention_mask = "attention_mask" in self.model_input_names
# required_input = encoded_inputs[self.model_input_names[0]]
# if padding_strategy == PaddingStrategy.LONGEST:
# max_length = len(required_input)
# if (
# max_length is not None
# and pad_to_multiple_of is not None
# and (max_length % pad_to_multiple_of != 0)
# ):
# max_length = ((max_length // pad_to_multiple_of) + 1) * pad_to_multiple_of
# needs_to_be_padded = (
# padding_strategy != PaddingStrategy.DO_NOT_PAD
# and len(required_input) != max_length
# )
# if needs_to_be_padded:
# difference = max_length - len(required_input)
# if self.padding_side == "right":
# if return_attention_mask:
# encoded_inputs["attention_mask"] = [1] * len(required_input) + [
# 0
# ] * difference
# if "token_type_ids" in encoded_inputs:
# encoded_inputs["token_type_ids"] = (
# encoded_inputs["token_type_ids"]
# + [self.pad_token_type_id] * difference
# )
# if "special_tokens_mask" in encoded_inputs:
# encoded_inputs["special_tokens_mask"] = (
# encoded_inputs["special_tokens_mask"] + [1] * difference
# )
# encoded_inputs[self.model_input_names[0]] = (
# required_input + [self.pad_token_id] * difference
# )
# elif self.padding_side == "left":
# if return_attention_mask:
# encoded_inputs["attention_mask"] = [0] * difference + [1] * len(
# required_input
# )
# if "token_type_ids" in encoded_inputs:
# encoded_inputs["token_type_ids"] = [
# self.pad_token_type_id
# ] * difference + encoded_inputs["token_type_ids"]
# if "special_tokens_mask" in encoded_inputs:
# encoded_inputs["special_tokens_mask"] = [
# 1
# ] * difference + encoded_inputs["special_tokens_mask"]
# encoded_inputs[self.model_input_names[0]] = [
# self.pad_token_id
# ] * difference + required_input
# else:
# raise ValueError("Invalid padding strategy:" + str(self.padding_side))
# elif return_attention_mask and "attention_mask" not in encoded_inputs:
# encoded_inputs["attention_mask"] = [1] * len(required_input)
# return encoded_inputs
def _collate_batch(examples, tokenizer, pad_to_multiple_of: Optional[int] = None):
"""Collate `examples` into a batch, using the information in `tokenizer` for padding if necessary."""
# Tensorize if necessary.
if isinstance(examples[0], (list, tuple)):
examples = [torch.tensor(e, dtype=torch.long) for e in examples]
# Check if padding is necessary.
length_of_first = examples[0].size(0)
are_tensors_same_length = all(x.size(0) == length_of_first for x in examples)
if are_tensors_same_length and (
pad_to_multiple_of is None or length_of_first % pad_to_multiple_of == 0
):
return torch.stack(examples, dim=0)
# If yes, check if we have a `pad_token`.
if tokenizer._pad_token is None:
raise ValueError(
"You are attempting to pad samples but the tokenizer you are using"
f" ({tokenizer.__class__.__name__}) does not have a pad token."
)
# Creating the full tensor and filling it with our data.
max_length = max(x.size(0) for x in examples)
if pad_to_multiple_of is not None and (max_length % pad_to_multiple_of != 0):
max_length = ((max_length // pad_to_multiple_of) + 1) * pad_to_multiple_of
result = examples[0].new_full([len(examples), max_length], tokenizer.pad_token_id)
for i, example in enumerate(examples):
if tokenizer.padding_side == "right":
result[i, : example.shape[0]] = example
else:
result[i, -example.shape[0] :] = example
return result
def to_py_obj(obj):
if isinstance(obj, torch.Tensor):
return obj.detach().cpu().tolist()
elif isinstance(obj, np.ndarray):
return obj.tolist()
else:
return obj