from functools import partial from typing import Any, Callable, List, Optional import torch from torch import nn, Tensor from torchvision.transforms._presets import ImageClassification from torchvision.utils import _log_api_usage_once from torchvision.models._api import Weights, WeightsEnum from torchvision.models._meta import _IMAGENET_CATEGORIES from torchvision.models._utils import _make_divisible, _ovewrite_named_param, handle_legacy_interface import warnings from typing import Callable, List, Optional, Sequence, Tuple, Union, TypeVar import collections from itertools import repeat M = TypeVar("M", bound=nn.Module) BUILTIN_MODELS = {} def register_model(name: Optional[str] = None) -> Callable[[Callable[..., M]], Callable[..., M]]: def wrapper(fn: Callable[..., M]) -> Callable[..., M]: key = name if name is not None else fn.__name__ if key in BUILTIN_MODELS: raise ValueError(f"An entry is already registered under the name '{key}'.") BUILTIN_MODELS[key] = fn return fn return wrapper def _make_ntuple(x: Any, n: int) -> Tuple[Any, ...]: """ Make n-tuple from input x. If x is an iterable, then we just convert it to tuple. Otherwise, we will make a tuple of length n, all with value of x. reference: https://github.com/pytorch/pytorch/blob/master/torch/nn/modules/utils.py#L8 Args: x (Any): input value n (int): length of the resulting tuple """ if isinstance(x, collections.abc.Iterable): return tuple(x) return tuple(repeat(x, n)) class ConvNormActivation(torch.nn.Sequential): def __init__( self, in_channels: int, out_channels: int, kernel_size: Union[int, Tuple[int, ...]] = 3, stride: Union[int, Tuple[int, ...]] = 1, padding: Optional[Union[int, Tuple[int, ...], str]] = None, groups: int = 1, norm_layer: Optional[Callable[..., torch.nn.Module]] = torch.nn.BatchNorm2d, activation_layer: Optional[Callable[..., torch.nn.Module]] = torch.nn.ReLU, dilation: Union[int, Tuple[int, ...]] = 1, inplace: Optional[bool] = True, bias: Optional[bool] = None, conv_layer: Callable[..., torch.nn.Module] = torch.nn.Conv2d, ) -> None: if padding is None: if isinstance(kernel_size, int) and isinstance(dilation, int): padding = (kernel_size - 1) // 2 * dilation else: _conv_dim = len(kernel_size) if isinstance(kernel_size, Sequence) else len(dilation) kernel_size = _make_ntuple(kernel_size, _conv_dim) dilation = _make_ntuple(dilation, _conv_dim) padding = tuple((kernel_size[i] - 1) // 2 * dilation[i] for i in range(_conv_dim)) if bias is None: bias = norm_layer is None layers = [ conv_layer( in_channels, out_channels, kernel_size, stride, padding, dilation=dilation, groups=groups, bias=bias, ) ] if norm_layer is not None: layers.append(norm_layer(out_channels)) if activation_layer is not None: params = {} if inplace is None else {"inplace": inplace} layers.append(activation_layer(**params)) super().__init__(*layers) _log_api_usage_once(self) self.out_channels = out_channels if self.__class__ == ConvNormActivation: warnings.warn( "Don't use ConvNormActivation directly, please use Conv2dNormActivation and Conv3dNormActivation instead." ) class Conv2dNormActivation(ConvNormActivation): """ Configurable block used for Convolution2d-Normalization-Activation blocks. Args: in_channels (int): Number of channels in the input image out_channels (int): Number of channels produced by the Convolution-Normalization-Activation block kernel_size: (int, optional): Size of the convolving kernel. Default: 3 stride (int, optional): Stride of the convolution. Default: 1 padding (int, tuple or str, optional): Padding added to all four sides of the input. Default: None, in which case it will be calculated as ``padding = (kernel_size - 1) // 2 * dilation`` groups (int, optional): Number of blocked connections from input channels to output channels. Default: 1 norm_layer (Callable[..., torch.nn.Module], optional): Norm layer that will be stacked on top of the convolution layer. If ``None`` this layer won't be used. Default: ``torch.nn.BatchNorm2d`` activation_layer (Callable[..., torch.nn.Module], optional): Activation function which will be stacked on top of the normalization layer (if not None), otherwise on top of the conv layer. If ``None`` this layer won't be used. Default: ``torch.nn.ReLU`` dilation (int): Spacing between kernel elements. Default: 1 inplace (bool): Parameter for the activation layer, which can optionally do the operation in-place. Default ``True`` bias (bool, optional): Whether to use bias in the convolution layer. By default, biases are included if ``norm_layer is None``. """ def __init__( self, in_channels: int, out_channels: int, kernel_size: Union[int, Tuple[int, int]] = 3, stride: Union[int, Tuple[int, int]] = 1, padding: Optional[Union[int, Tuple[int, int], str]] = None, groups: int = 1, norm_layer: Optional[Callable[..., torch.nn.Module]] = torch.nn.BatchNorm2d, activation_layer: Optional[Callable[..., torch.nn.Module]] = torch.nn.ReLU, dilation: Union[int, Tuple[int, int]] = 1, inplace: Optional[bool] = True, bias: Optional[bool] = None, ) -> None: super().__init__( in_channels, out_channels, kernel_size, stride, padding, groups, norm_layer, activation_layer, dilation, inplace, bias, torch.nn.Conv2d, ) __all__ = ["MobileNetV2", "MobileNet_V2_Weights", "mobilenet_v2"] # necessary for backwards compatibility class InvertedResidual(nn.Module): def __init__( self, inp: int, oup: int, stride: int, expand_ratio: int, norm_layer: Optional[Callable[..., nn.Module]] = None ) -> None: super().__init__() self.stride = stride if stride not in [1, 2]: raise ValueError(f"stride should be 1 or 2 instead of {stride}") if norm_layer is None: norm_layer = nn.BatchNorm2d hidden_dim = int(round(inp * expand_ratio)) self.use_res_connect = self.stride == 1 and inp == oup layers: List[nn.Module] = [] if expand_ratio != 1: # pw layers.append( Conv2dNormActivation(inp, hidden_dim, kernel_size=1, norm_layer=norm_layer, activation_layer=nn.ReLU6) ) layers.extend( [ # dw Conv2dNormActivation( hidden_dim, hidden_dim, stride=stride, groups=hidden_dim, norm_layer=norm_layer, activation_layer=nn.ReLU6, ), # pw-linear nn.Conv2d(hidden_dim, oup, 1, 1, 0, bias=False), norm_layer(oup), ] ) self.conv = nn.Sequential(*layers) self.out_channels = oup self._is_cn = stride > 1 def forward(self, x: Tensor) -> Tensor: if self.use_res_connect: return x + self.conv(x) else: return self.conv(x) class MobileNetV2(nn.Module): def __init__( self, num_classes: int = 1000, width_mult: float = 1.0, inverted_residual_setting: Optional[List[List[int]]] = None, round_nearest: int = 8, block: Optional[Callable[..., nn.Module]] = None, norm_layer: Optional[Callable[..., nn.Module]] = None, dropout: float = 0.2, ) -> None: """ MobileNet V2 main class Args: num_classes (int): Number of classes width_mult (float): Width multiplier - adjusts number of channels in each layer by this amount inverted_residual_setting: Network structure round_nearest (int): Round the number of channels in each layer to be a multiple of this number Set to 1 to turn off rounding block: Module specifying inverted residual building block for mobilenet norm_layer: Module specifying the normalization layer to use dropout (float): The droupout probability """ super().__init__() _log_api_usage_once(self) if block is None: block = InvertedResidual if norm_layer is None: norm_layer = nn.BatchNorm2d input_channel = 32 last_channel = 1280 if inverted_residual_setting is None: inverted_residual_setting = [ # t, c, n, s [1, 16, 1, 1], [6, 24, 2, 1], [6, 32, 3, 1], [6, 64, 4, 2], [6, 96, 3, 1], [6, 160, 3, 2], [6, 320, 1, 1], ] # only check the first element, assuming user knows t,c,n,s are required if len(inverted_residual_setting) == 0 or len(inverted_residual_setting[0]) != 4: raise ValueError( f"inverted_residual_setting should be non-empty or a 4-element list, got {inverted_residual_setting}" ) # building first layer input_channel = _make_divisible(input_channel * width_mult, round_nearest) self.last_channel = _make_divisible(last_channel * max(1.0, width_mult), round_nearest) features: List[nn.Module] = [ Conv2dNormActivation(3, input_channel, stride=2, norm_layer=norm_layer, activation_layer=nn.ReLU6) ] # building inverted residual blocks for t, c, n, s in inverted_residual_setting: output_channel = _make_divisible(c * width_mult, round_nearest) for i in range(n): stride = s if i == 0 else 1 features.append(block(input_channel, output_channel, stride, expand_ratio=t, norm_layer=norm_layer)) input_channel = output_channel # building last several layers features.append( Conv2dNormActivation( input_channel, self.last_channel, kernel_size=1, norm_layer=norm_layer, activation_layer=nn.ReLU6 ) ) # make it nn.Sequential self.features = nn.Sequential(*features) # self.layer1 = nn.Sequential(*features[:]) # self.layer2 = features[57:120] # self.layer3 = features[120:] # building classifier self.classifier = nn.Sequential( nn.Dropout(p=dropout), nn.Linear(self.last_channel, num_classes), ) # weight initialization for m in self.modules(): if isinstance(m, nn.Conv2d): nn.init.kaiming_normal_(m.weight, mode="fan_out") if m.bias is not None: nn.init.zeros_(m.bias) elif isinstance(m, (nn.BatchNorm2d, nn.GroupNorm)): nn.init.ones_(m.weight) nn.init.zeros_(m.bias) elif isinstance(m, nn.Linear): nn.init.normal_(m.weight, 0, 0.01) nn.init.zeros_(m.bias) def _forward_impl(self, x: Tensor) -> Tensor: # This exists since TorchScript doesn't support inheritance, so the superclass method # (this one) needs to have a name other than `forward` that can be accessed in a subclass out_layers = [] for layer in self.features.named_modules(): for i, layer1 in enumerate(layer[1]): # print(layer1) x = layer1(x) # print("第{}层,输出大小{}".format(i, x.shape)) if i in [0, 10, 18]: out_layers.append(x) break # x = self.features(x) # Cannot use "squeeze" as batch-size can be 1 # x = nn.functional.adaptive_avg_pool2d(x, (1, 1)) # x = torch.flatten(x, 1) # x = self.classifier(x) return out_layers def forward(self, x: Tensor) -> Tensor: return self._forward_impl(x) _COMMON_META = { "num_params": 3504872, "min_size": (1, 1), "categories": _IMAGENET_CATEGORIES, } class MobileNet_V2_Weights(WeightsEnum): IMAGENET1K_V1 = Weights( url="https://download.pytorch.org/models/mobilenet_v2-b0353104.pth", transforms=partial(ImageClassification, crop_size=224), meta={ **_COMMON_META, "recipe": "https://github.com/pytorch/vision/tree/main/references/classification#mobilenetv2", "_metrics": { "ImageNet-1K": { "acc@1": 71.878, "acc@5": 90.286, } }, "_ops": 0.301, "_file_size": 13.555, "_docs": """These weights reproduce closely the results of the paper using a simple training recipe.""", }, ) IMAGENET1K_V2 = Weights( url="https://download.pytorch.org/models/mobilenet_v2-7ebf99e0.pth", transforms=partial(ImageClassification, crop_size=224, resize_size=232), meta={ **_COMMON_META, "recipe": "https://github.com/pytorch/vision/issues/3995#new-recipe-with-reg-tuning", "_metrics": { "ImageNet-1K": { "acc@1": 72.154, "acc@5": 90.822, } }, "_ops": 0.301, "_file_size": 13.598, "_docs": """ These weights improve upon the results of the original paper by using a modified version of TorchVision's `new training recipe `_. """, }, ) DEFAULT = IMAGENET1K_V2 # @register_model() # @handle_legacy_interface(weights=("pretrained", MobileNet_V2_Weights.IMAGENET1K_V1)) def mobilenet_v2( *, weights: Optional[MobileNet_V2_Weights] = MobileNet_V2_Weights.IMAGENET1K_V1, progress: bool = True, **kwargs: Any ) -> MobileNetV2: """MobileNetV2 architecture from the `MobileNetV2: Inverted Residuals and Linear Bottlenecks `_ paper. Args: weights (:class:`~torchvision.models.MobileNet_V2_Weights`, optional): The pretrained weights to use. See :class:`~torchvision.models.MobileNet_V2_Weights` below for more details, and possible values. By default, no pre-trained weights are used. progress (bool, optional): If True, displays a progress bar of the download to stderr. Default is True. **kwargs: parameters passed to the ``torchvision.models.mobilenetv2.MobileNetV2`` base class. Please refer to the `source code `_ for more details about this class. .. autoclass:: torchvision.models.MobileNet_V2_Weights :members: """ weights = MobileNet_V2_Weights.verify(weights) if weights is not None: _ovewrite_named_param(kwargs, "num_classes", len(weights.meta["categories"])) model = MobileNetV2(**kwargs) if weights is not None: model.load_state_dict(weights.get_state_dict(progress=progress)) return model def conv1x1(in_planes, out_planes, stride=1): """1x1 convolution""" return nn.Conv2d(in_planes, out_planes, kernel_size=1, stride=stride, bias=False) class MobileNetv2Wrapper(nn.Module): def __init__(self): super(MobileNetv2Wrapper, self).__init__() weights = MobileNet_V2_Weights.verify(MobileNet_V2_Weights.IMAGENET1K_V1) self.model = MobileNetV2() if weights is not None: self.model.load_state_dict(weights.get_state_dict(progress=True)) self.out3 = conv1x1(1280, 128) def forward(self, x): # print(x.shape) out_layers = self.model(x) # print(x.shape) # out_layers[0] = self.out1(out_layers[0]) # out_layers[1] = self.out2(out_layers[1]) out_layers[2] = self.out3(out_layers[2]) # print(x.shape) return out_layers