nats-nsc module

User token creation for NATS

Account

Bases: Auth

Account class.

Source code in src/nats_nsc/__init__.py
class Account(Auth):
    '''Account class.'''
    priv_key: ty.Optional[str] = None

    def __init__(self, jwt_token: str, priv_key: ty.Optional[str] = None) -> None:
        """Create Account object.

        Args:
            jwt_token (str): Account's JWT token.
            priv_key (ty.Optional[str], optional): Account's private key (seed). Defaults to None.
        """
        super().__init__(jwt_token)
        self.priv_key = priv_key

    @classmethod
    def _verify_payload(cls, payload: ty.Dict[str, ty.Any]) -> bool:
        return payload['nats']['type'] == 'account'

    @property
    def limits(self) -> AccountLimits:
        '''Default limits of an account.'''
        return AccountLimits(**self._jwt_payload['nats']['limits'])

    @property
    def sub_permissions(self) -> Permissions:
        '''Default sub permissions of an account.'''
        ps_dct = self._jwt_payload['nats']['default_permissions']['sub']
        return Permissions(allow=ps_dct['allow'] if 'allow' in ps_dct else [],
                           deny=ps_dct['deny'] if 'deny' in ps_dct else [])

    @property
    def pub_permissions(self) -> Permissions:
        '''Default pub permissions of an account.'''
        ps_dct = self._jwt_payload['nats']['default_permissions']['pub']
        return Permissions(allow=ps_dct['allow'] if 'allow' in ps_dct else [],
                           deny=ps_dct['deny'] if 'deny' in ps_dct else [])

    @property
    def has_key(self) -> bool:
        '''Check if account has a private key.'''
        return self.priv_key is not None

has_key: bool property

Check if account has a private key.

limits: AccountLimits property

Default limits of an account.

pub_permissions: Permissions property

Default pub permissions of an account.

sub_permissions: Permissions property

Default sub permissions of an account.

__init__(jwt_token, priv_key=None)

Create Account object.

Parameters:

Name Type Description Default
jwt_token str

Account's JWT token.

required
priv_key ty.Optional[str]

Account's private key (seed). Defaults to None.

None
Source code in src/nats_nsc/__init__.py
def __init__(self, jwt_token: str, priv_key: ty.Optional[str] = None) -> None:
    """Create Account object.

    Args:
        jwt_token (str): Account's JWT token.
        priv_key (ty.Optional[str], optional): Account's private key (seed). Defaults to None.
    """
    super().__init__(jwt_token)
    self.priv_key = priv_key

AccountLimits dataclass

Bases: _BaseInitForgivingExtras

Account limits.

Source code in src/nats_nsc/__init__.py
@dataclass(init=False)
class AccountLimits(_BaseInitForgivingExtras):
    '''Account limits.'''
    subs: int
    data: int
    payload: int
    imports: int
    exports: int
    wildcards: bool
    conn: int
    leaf: int

Auth

Base class for authentication objects

Source code in src/nats_nsc/__init__.py
class Auth():
    '''Base class for authentication objects'''
    _jwt_payload: ty.Dict[str, ty.Any]

    def __init__(self, jwt_token: str) -> None:
        """Initialize Auth object.

        Args:
            jwt_token (str): JWT token of the object.

        Raises:
            ValueError: Bad JWT token.
        """
        jwt_payload = _decode_jwt_payload(jwt_token)
        if not self._verify_payload(jwt_payload):
            raise ValueError("Invalid JWT type")
        self._jwt_payload = jwt_payload

    @classmethod
    def _verify_payload(cls, payload: ty.Dict[str, ty.Any]) -> bool:
        raise NotImplementedError

    @property
    def jwt_id(self) -> str:
        '''JWT ID.'''
        return self._jwt_payload['jti']

    @property
    def name(self) -> str:
        '''Name of an object'''
        return self._jwt_payload['name']

    @property
    def pub_key(self) -> str:
        '''Public nkey of object'''
        return self._jwt_payload['sub']

    @property
    def subject(self) -> str:
        '''JWT's subject. Pub key by convention.'''
        return self.pub_key

    @property
    def nats_props(self) -> ty.Dict[str, ty.Any]:
        '''NATS properties.'''
        return self._jwt_payload['nats']

    @property
    def issuer(self) -> str:
        '''Issuer nkey.'''
        return self._jwt_payload['iss']

    @property
    def issued_at(self) -> datetime:
        '''Issued at.'''
        return datetime.utcfromtimestamp(self._jwt_payload['iat'])

issued_at: datetime property

Issued at.

issuer: str property

Issuer nkey.

jwt_id: str property

JWT ID.

name: str property

Name of an object

nats_props: ty.Dict[str, ty.Any] property

NATS properties.

pub_key: str property

Public nkey of object

subject: str property

JWT's subject. Pub key by convention.

__init__(jwt_token)

Initialize Auth object.

Parameters:

Name Type Description Default
jwt_token str

JWT token of the object.

required

Raises:

Type Description
ValueError

Bad JWT token.

Source code in src/nats_nsc/__init__.py
def __init__(self, jwt_token: str) -> None:
    """Initialize Auth object.

    Args:
        jwt_token (str): JWT token of the object.

    Raises:
        ValueError: Bad JWT token.
    """
    jwt_payload = _decode_jwt_payload(jwt_token)
    if not self._verify_payload(jwt_payload):
        raise ValueError("Invalid JWT type")
    self._jwt_payload = jwt_payload

Credential

Credential class.

Source code in src/nats_nsc/__init__.py
class Credential():
    '''Credential class.'''

    def __init__(self, payload: str) -> None:
        '''Create Credential object.'''
        self._payload = payload

    @property
    def payload(self) -> str:
        '''Payload of object.'''
        return self._payload

    @property
    def jwt(self) -> dict:
        '''JWT section of credential'''
        payload_splitted = self._payload.split('\n')
        jwt_line_start = payload_splitted.index('-----BEGIN NATS USER JWT-----')
        jwt_line_end = payload_splitted.index('------END NATS USER JWT------')
        return _decode_jwt_payload('\n'.join(payload_splitted[jwt_line_start+1:jwt_line_end]))

    @property
    def nkey(self) -> str:
        '''Nkey section of credential'''
        payload_splitted = self._payload.split('\n')
        nkey_line_start = payload_splitted.index('-----BEGIN USER NKEY SEED-----')
        nkey_line_end = payload_splitted.index('------END USER NKEY SEED------')
        return '\n'.join(payload_splitted[nkey_line_start+1:nkey_line_end])

jwt: dict property

JWT section of credential

nkey: str property

Nkey section of credential

payload: str property

Payload of object.

__init__(payload)

Create Credential object.

Source code in src/nats_nsc/__init__.py
def __init__(self, payload: str) -> None:
    '''Create Credential object.'''
    self._payload = payload

KeyType

Bases: Enum

Key type enum.

Source code in src/nats_nsc/__init__.py
class KeyType(Enum):
    '''Key type enum.'''
    USER = 'user'
    ACCOUNT = 'account'
    OPERATOR = 'operator'

Operator

Bases: Auth

Operator class.

Source code in src/nats_nsc/__init__.py
class Operator(Auth):
    '''Operator class.'''
    @classmethod
    def _verify_payload(cls, payload: ty.Dict[str, ty.Any]) -> bool:
        return payload['nats']['type'] == 'operator'

Permissions dataclass

Bases: _BaseInitForgivingExtras

Default pub/sub permissions.

Source code in src/nats_nsc/__init__.py
@dataclass(init=False)
class Permissions(_BaseInitForgivingExtras):
    '''Default pub/sub permissions.'''
    allow: ty.List[str]
    deny: ty.List[str]

    def as_dict(self) -> ty.Dict[str, ty.List[str]]:
        return {
            'allow': self.allow,
            'deny': self.deny
        }

User

Bases: Auth

User class.

Source code in src/nats_nsc/__init__.py
class User(Auth):
    '''User class.'''

    def __init__(self, jwt_token: str) -> None:
        self._full_jwt = jwt_token
        super().__init__(jwt_token)

    @property
    def jwt_token(self) -> str:
        '''JWT token of an user.'''
        return self._full_jwt

    @classmethod
    def _verify_payload(cls, payload: ty.Dict[str, ty.Any]) -> bool:
        return payload['nats']['type'] == 'user'

    @property
    def limits(self) -> UserLimits:
        '''Limits of an user.'''
        nats_dict = self._jwt_payload['nats']
        return UserLimits(subs=nats_dict['sub'], data=nats_dict['data'], payload=nats_dict['payload'])

    @property
    def sub_permissions(self) -> Permissions:
        '''Sub permissions of an user.'''
        return Permissions(**self._jwt_payload['nats']['sub'])

    @property
    def pub_permissions(self) -> Permissions:
        '''Pub permissions of an user.'''
        return Permissions(**self._jwt_payload['nats']['pub'])

    @property
    def src_networks(self) -> ty.Optional[ty.List[str]]:
        '''Allowed source networks of an user.'''
        return self._jwt_payload['nats']['src'] if 'src' in self._jwt_payload['nats'] else None

    @property
    def bearer(self) -> bool:
        '''Whether user can use bearer authentication.'''
        return self._jwt_payload['nats']['bearer_token'] if 'bearer_token' in self._jwt_payload['bearer_token'] else False

    @property
    def resp_ttl(self) -> timedelta:
        '''Response TTL of an user.'''
        seconds = self._jwt_payload['nats']['resp']['ttl'] / TTL_SCALE if 'resp' in self._jwt_payload['nats'] else 1.0
        return timedelta(seconds=seconds)

    @property
    def max_resp(self) -> int:
        '''Max response Count of an user.'''
        return self._jwt_payload['nats']['resp']['max'] if 'resp' in self._jwt_payload['nats'] else 1

bearer: bool property

Whether user can use bearer authentication.

jwt_token: str property

JWT token of an user.

limits: UserLimits property

Limits of an user.

max_resp: int property

Max response Count of an user.

pub_permissions: Permissions property

Pub permissions of an user.

resp_ttl: timedelta property

Response TTL of an user.

src_networks: ty.Optional[ty.List[str]] property

Allowed source networks of an user.

sub_permissions: Permissions property

Sub permissions of an user.

UserLimits dataclass

User limits.

Source code in src/nats_nsc/__init__.py
@dataclass
class UserLimits():
    '''User limits.'''
    subs: int
    data: int
    payload: int

create_user

Home of the create_user function.

create_user(user_name, account, pub_key, *, jwt_id=None, allow_pub=None, allow_pub_response=None, allow_pubsub=None, allow_sub=None, bearer=False, deny_pub=None, deny_pubsub=None, deny_sub=None, expiry=None, response_ttl=None, source_networks=None, start=None, tag=None)

Create user.

Parameters:

Name Type Description Default
user_name str

Name of the user.

required
account Account

Account to create the user for.

required
pub_key str

Public key of the user.

required
jwt_id ty.Optional[str]

JWT identifier. If not provided, a random UUID is generated.

None
allow_pub ty.Optional[ty.List[str]]

List of allowed publication subjects. Defaults to None.

None
allow_pub_response ty.Optional[int]

Number of responses allowed for each publication. Defaults to 1.

None
allow_pubsub ty.Optional[ty.List[str]]

List of allowed publication and subscription subjects. Defaults to None.

None
allow_sub ty.Optional[ty.List[str]]

List of allowed subscription subjects. Defaults to None.

None
bearer bool

Whether the user is a bearer token. Defaults to False.

False
deny_pub ty.Optional[ty.List[str]]

List of denied publication subjects. Defaults to None.

None
deny_pubsub ty.Optional[ty.List[str]]

List of denied publication and subscription subjects. Defaults to None.

None
deny_sub ty.Optional[ty.List[str]]

List of denied subscription subjects. Defaults to None.

None
expiry ty.Optional[timedelta]

Expiry of the user token. Defaults to None (does not expire).

None
response_ttl ty.Optional[timedelta]

Response TTL. Defaults to None.

None
source_networks ty.Optional[ty.List[str]]

Allowed source networks. Defaults to None (all allowed).

None
start ty.Union[timedelta, datetime, None]

Datetime, or timedelta from now, when the token is valid from. Defaults to None (now).

None
tag ty.Optional[ty.List[str]]

List of tags. Defaults to None.

None

Raises:

Type Description
ValueError

Invalid parameters.

Returns:

Name Type Description
User User

User object.

Source code in src/nats_nsc/create_user.py
def create_user(user_name: str, account: Account,
                pub_key: str, *, jwt_id: ty.Optional[str] = None,
                allow_pub: ty.Optional[ty.List[str]] = None,
                allow_pub_response: ty.Optional[int] = None,
                allow_pubsub: ty.Optional[ty.List[str]] = None,
                allow_sub: ty.Optional[ty.List[str]] = None,
                bearer: bool = False,
                deny_pub: ty.Optional[ty.List[str]] = None,
                deny_pubsub: ty.Optional[ty.List[str]] = None,
                deny_sub: ty.Optional[ty.List[str]] = None,
                expiry: ty.Optional[timedelta] = None,
                response_ttl: ty.Optional[timedelta] = None,
                source_networks: ty.Optional[ty.List[str]] = None,
                start: ty.Union[timedelta, datetime, None] = None,
                tag: ty.Optional[ty.List[str]] = None) -> User:
    """Create user.

    Args:
        user_name (str): Name of the user.
        account (Account): Account to create the user for.
        pub_key (str): Public key of the user.
        jwt_id (ty.Optional[str], optional): JWT identifier. If not provided, a random UUID is generated.
        allow_pub (ty.Optional[ty.List[str]], optional): List of allowed publication subjects. Defaults to None.
        allow_pub_response (ty.Optional[int], optional): Number of responses allowed for each publication. Defaults to 1.
        allow_pubsub (ty.Optional[ty.List[str]], optional): List of allowed publication and subscription subjects. Defaults to None.
        allow_sub (ty.Optional[ty.List[str]], optional): List of allowed subscription subjects. Defaults to None.
        bearer (bool, optional): Whether the user is a bearer token. Defaults to False.
        deny_pub (ty.Optional[ty.List[str]], optional): List of denied publication subjects. Defaults to None.
        deny_pubsub (ty.Optional[ty.List[str]], optional): List of denied publication and subscription subjects. Defaults to None.
        deny_sub (ty.Optional[ty.List[str]], optional): List of denied subscription subjects. Defaults to None.
        expiry (ty.Optional[timedelta], optional): Expiry of the user token. Defaults to None (does not expire).
        response_ttl (ty.Optional[timedelta], optional): Response TTL. Defaults to None.
        source_networks (ty.Optional[ty.List[str]], optional): Allowed source networks. Defaults to None (all allowed).
        start (ty.Union[timedelta, datetime, None], optional): Datetime, or timedelta from now, when the token is valid from. Defaults to None (now).
        tag (ty.Optional[ty.List[str]], optional): List of tags. Defaults to None.

    Raises:
        ValueError: Invalid parameters.

    Returns:
        User: User object.
    """  # noqa: 501
    if not account.has_key:
        raise ValueError('Account has no key')

    issued_at = start if isinstance(start, datetime) else datetime.utcnow()
    if isinstance(start, timedelta):
        issued_at += start

    pub = account.pub_permissions.as_dict()
    if allow_pub is None or allow_pubsub is None:
        pub['allow'] = []
        if allow_pub is not None:
            pub['allow'] += allow_pub
        if allow_pubsub is not None:
            pub['allow'] += allow_pubsub
    if deny_pub is not None or deny_pubsub is not None:
        pub['deny'] = []
        if deny_pub is not None:
            pub['deny'] += deny_pub
        if deny_pubsub is not None:
            pub['deny'] += deny_pubsub

    sub = account.sub_permissions.as_dict()
    if allow_sub is not None or allow_pubsub is not None:
        sub['allow'] = []
        if allow_sub is not None:
            sub['allow'] += allow_sub
        if allow_pubsub is not None:
            sub['allow'] += allow_pubsub
    if deny_sub is not None or deny_pubsub is not None:
        sub['deny'] = []
        if deny_sub is not None:
            sub['deny'] += deny_sub
        if deny_pubsub is not None:
            sub['deny'] += deny_pubsub

    resp = None
    if allow_pub_response is not None or response_ttl is not None:
        if allow_pub_response is None:
            allow_pub_response = 0  # Yea, I don't know why either, but that's how nsc works
        if response_ttl is None:
            response_ttl = timedelta(seconds=0)
        resp = {
            'max': allow_pub_response,
            'ttl': response_ttl.total_seconds() * TTL_SCALE
        }

    payload = {
        'jti': uuid.uuid4().hex if jwt_id is None else jwt_id,
        'iat': int(issued_at.timestamp()),
        'iss': account.pub_key,
        'name': user_name,
        'sub': pub_key,
        'nats': {
            'sub': sub,
            'pub': pub,
            "subs": account.limits.subs,
            "data": account.limits.data,
            "payload": account.limits.payload,
            "type": "user",
            "version": 2
        }
    }

    if expiry is not None:
        payload['exp'] = int((issued_at + expiry).timestamp())
    if resp is not None:
        payload['nats']['resp'] = resp
    if source_networks:
        payload['nats']['src'] = source_networks
    if bearer:
        payload['nats']['bearer_token'] = True
    if tag:
        payload['nats']['tags'] = tag

    to_sign = base64.urlsafe_b64encode(json.dumps(HEADER).encode()).strip(b'=') + b'.' +\
        base64.urlsafe_b64encode(json.dumps(payload).encode()).strip(b'=')

    user = nkeys.from_seed(account.priv_key.encode())  # type: ignore
    sig = user.sign(to_sign)
    user.wipe()
    jwt = to_sign + b'.' + base64.urlsafe_b64encode(sig).strip(b'=')
    return User(jwt_token=jwt.decode())