nats-nsc module
User token creation for NATS
Account
Bases: Auth
Account class.
Source code in src/nats_nsc/__init__.py
class Account(Auth):
'''Account class.'''
priv_key: ty.Optional[str] = None
def __init__(self, jwt_token: str, priv_key: ty.Optional[str] = None) -> None:
"""Create Account object.
Args:
jwt_token (str): Account's JWT token.
priv_key (ty.Optional[str], optional): Account's private key (seed). Defaults to None.
"""
super().__init__(jwt_token)
self.priv_key = priv_key
@classmethod
def _verify_payload(cls, payload: ty.Dict[str, ty.Any]) -> bool:
return payload['nats']['type'] == 'account'
@property
def limits(self) -> AccountLimits:
'''Default limits of an account.'''
return AccountLimits(**self._jwt_payload['nats']['limits'])
@property
def sub_permissions(self) -> Permissions:
'''Default sub permissions of an account.'''
ps_dct = self._jwt_payload['nats']['default_permissions']['sub']
return Permissions(allow=ps_dct['allow'] if 'allow' in ps_dct else [],
deny=ps_dct['deny'] if 'deny' in ps_dct else [])
@property
def pub_permissions(self) -> Permissions:
'''Default pub permissions of an account.'''
ps_dct = self._jwt_payload['nats']['default_permissions']['pub']
return Permissions(allow=ps_dct['allow'] if 'allow' in ps_dct else [],
deny=ps_dct['deny'] if 'deny' in ps_dct else [])
@property
def has_key(self) -> bool:
'''Check if account has a private key.'''
return self.priv_key is not None
has_key: bool
property
Check if account has a private key.
limits: AccountLimits
property
Default limits of an account.
pub_permissions: Permissions
property
Default pub permissions of an account.
sub_permissions: Permissions
property
Default sub permissions of an account.
__init__(jwt_token, priv_key=None)
Create Account object.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
jwt_token |
str
|
Account's JWT token. |
required |
priv_key |
ty.Optional[str]
|
Account's private key (seed). Defaults to None. |
None
|
Source code in src/nats_nsc/__init__.py
def __init__(self, jwt_token: str, priv_key: ty.Optional[str] = None) -> None:
"""Create Account object.
Args:
jwt_token (str): Account's JWT token.
priv_key (ty.Optional[str], optional): Account's private key (seed). Defaults to None.
"""
super().__init__(jwt_token)
self.priv_key = priv_key
AccountLimits
dataclass
Bases: _BaseInitForgivingExtras
Account limits.
Source code in src/nats_nsc/__init__.py
@dataclass(init=False)
class AccountLimits(_BaseInitForgivingExtras):
'''Account limits.'''
subs: int
data: int
payload: int
imports: int
exports: int
wildcards: bool
conn: int
leaf: int
Auth
Base class for authentication objects
Source code in src/nats_nsc/__init__.py
class Auth():
'''Base class for authentication objects'''
_jwt_payload: ty.Dict[str, ty.Any]
def __init__(self, jwt_token: str) -> None:
"""Initialize Auth object.
Args:
jwt_token (str): JWT token of the object.
Raises:
ValueError: Bad JWT token.
"""
jwt_payload = _decode_jwt_payload(jwt_token)
if not self._verify_payload(jwt_payload):
raise ValueError("Invalid JWT type")
self._jwt_payload = jwt_payload
@classmethod
def _verify_payload(cls, payload: ty.Dict[str, ty.Any]) -> bool:
raise NotImplementedError
@property
def jwt_id(self) -> str:
'''JWT ID.'''
return self._jwt_payload['jti']
@property
def name(self) -> str:
'''Name of an object'''
return self._jwt_payload['name']
@property
def pub_key(self) -> str:
'''Public nkey of object'''
return self._jwt_payload['sub']
@property
def subject(self) -> str:
'''JWT's subject. Pub key by convention.'''
return self.pub_key
@property
def nats_props(self) -> ty.Dict[str, ty.Any]:
'''NATS properties.'''
return self._jwt_payload['nats']
@property
def issuer(self) -> str:
'''Issuer nkey.'''
return self._jwt_payload['iss']
@property
def issued_at(self) -> datetime:
'''Issued at.'''
return datetime.utcfromtimestamp(self._jwt_payload['iat'])
issued_at: datetime
property
Issued at.
issuer: str
property
Issuer nkey.
jwt_id: str
property
JWT ID.
name: str
property
Name of an object
nats_props: ty.Dict[str, ty.Any]
property
NATS properties.
pub_key: str
property
Public nkey of object
subject: str
property
JWT's subject. Pub key by convention.
__init__(jwt_token)
Initialize Auth object.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
jwt_token |
str
|
JWT token of the object. |
required |
Raises:
Type | Description |
---|---|
ValueError
|
Bad JWT token. |
Source code in src/nats_nsc/__init__.py
def __init__(self, jwt_token: str) -> None:
"""Initialize Auth object.
Args:
jwt_token (str): JWT token of the object.
Raises:
ValueError: Bad JWT token.
"""
jwt_payload = _decode_jwt_payload(jwt_token)
if not self._verify_payload(jwt_payload):
raise ValueError("Invalid JWT type")
self._jwt_payload = jwt_payload
Credential
Credential class.
Source code in src/nats_nsc/__init__.py
class Credential():
'''Credential class.'''
def __init__(self, payload: str) -> None:
'''Create Credential object.'''
self._payload = payload
@property
def payload(self) -> str:
'''Payload of object.'''
return self._payload
@property
def jwt(self) -> dict:
'''JWT section of credential'''
payload_splitted = self._payload.split('\n')
jwt_line_start = payload_splitted.index('-----BEGIN NATS USER JWT-----')
jwt_line_end = payload_splitted.index('------END NATS USER JWT------')
return _decode_jwt_payload('\n'.join(payload_splitted[jwt_line_start+1:jwt_line_end]))
@property
def nkey(self) -> str:
'''Nkey section of credential'''
payload_splitted = self._payload.split('\n')
nkey_line_start = payload_splitted.index('-----BEGIN USER NKEY SEED-----')
nkey_line_end = payload_splitted.index('------END USER NKEY SEED------')
return '\n'.join(payload_splitted[nkey_line_start+1:nkey_line_end])
jwt: dict
property
JWT section of credential
nkey: str
property
Nkey section of credential
payload: str
property
Payload of object.
__init__(payload)
Create Credential object.
Source code in src/nats_nsc/__init__.py
def __init__(self, payload: str) -> None:
'''Create Credential object.'''
self._payload = payload
KeyType
Bases: Enum
Key type enum.
Source code in src/nats_nsc/__init__.py
class KeyType(Enum):
'''Key type enum.'''
USER = 'user'
ACCOUNT = 'account'
OPERATOR = 'operator'
Operator
Bases: Auth
Operator class.
Source code in src/nats_nsc/__init__.py
class Operator(Auth):
'''Operator class.'''
@classmethod
def _verify_payload(cls, payload: ty.Dict[str, ty.Any]) -> bool:
return payload['nats']['type'] == 'operator'
Permissions
dataclass
Bases: _BaseInitForgivingExtras
Default pub/sub permissions.
Source code in src/nats_nsc/__init__.py
@dataclass(init=False)
class Permissions(_BaseInitForgivingExtras):
'''Default pub/sub permissions.'''
allow: ty.List[str]
deny: ty.List[str]
def as_dict(self) -> ty.Dict[str, ty.List[str]]:
return {
'allow': self.allow,
'deny': self.deny
}
User
Bases: Auth
User class.
Source code in src/nats_nsc/__init__.py
class User(Auth):
'''User class.'''
def __init__(self, jwt_token: str) -> None:
self._full_jwt = jwt_token
super().__init__(jwt_token)
@property
def jwt_token(self) -> str:
'''JWT token of an user.'''
return self._full_jwt
@classmethod
def _verify_payload(cls, payload: ty.Dict[str, ty.Any]) -> bool:
return payload['nats']['type'] == 'user'
@property
def limits(self) -> UserLimits:
'''Limits of an user.'''
nats_dict = self._jwt_payload['nats']
return UserLimits(subs=nats_dict['sub'], data=nats_dict['data'], payload=nats_dict['payload'])
@property
def sub_permissions(self) -> Permissions:
'''Sub permissions of an user.'''
return Permissions(**self._jwt_payload['nats']['sub'])
@property
def pub_permissions(self) -> Permissions:
'''Pub permissions of an user.'''
return Permissions(**self._jwt_payload['nats']['pub'])
@property
def src_networks(self) -> ty.Optional[ty.List[str]]:
'''Allowed source networks of an user.'''
return self._jwt_payload['nats']['src'] if 'src' in self._jwt_payload['nats'] else None
@property
def bearer(self) -> bool:
'''Whether user can use bearer authentication.'''
return self._jwt_payload['nats']['bearer_token'] if 'bearer_token' in self._jwt_payload['bearer_token'] else False
@property
def resp_ttl(self) -> timedelta:
'''Response TTL of an user.'''
seconds = self._jwt_payload['nats']['resp']['ttl'] / TTL_SCALE if 'resp' in self._jwt_payload['nats'] else 1.0
return timedelta(seconds=seconds)
@property
def max_resp(self) -> int:
'''Max response Count of an user.'''
return self._jwt_payload['nats']['resp']['max'] if 'resp' in self._jwt_payload['nats'] else 1
bearer: bool
property
Whether user can use bearer authentication.
jwt_token: str
property
JWT token of an user.
limits: UserLimits
property
Limits of an user.
max_resp: int
property
Max response Count of an user.
pub_permissions: Permissions
property
Pub permissions of an user.
resp_ttl: timedelta
property
Response TTL of an user.
src_networks: ty.Optional[ty.List[str]]
property
Allowed source networks of an user.
sub_permissions: Permissions
property
Sub permissions of an user.
UserLimits
dataclass
User limits.
Source code in src/nats_nsc/__init__.py
@dataclass
class UserLimits():
'''User limits.'''
subs: int
data: int
payload: int
create_user
Home of the create_user function.
create_user(user_name, account, pub_key, *, jwt_id=None, allow_pub=None, allow_pub_response=None, allow_pubsub=None, allow_sub=None, bearer=False, deny_pub=None, deny_pubsub=None, deny_sub=None, expiry=None, response_ttl=None, source_networks=None, start=None, tag=None)
Create user.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
user_name |
str
|
Name of the user. |
required |
account |
Account
|
Account to create the user for. |
required |
pub_key |
str
|
Public key of the user. |
required |
jwt_id |
ty.Optional[str]
|
JWT identifier. If not provided, a random UUID is generated. |
None
|
allow_pub |
ty.Optional[ty.List[str]]
|
List of allowed publication subjects. Defaults to None. |
None
|
allow_pub_response |
ty.Optional[int]
|
Number of responses allowed for each publication. Defaults to 1. |
None
|
allow_pubsub |
ty.Optional[ty.List[str]]
|
List of allowed publication and subscription subjects. Defaults to None. |
None
|
allow_sub |
ty.Optional[ty.List[str]]
|
List of allowed subscription subjects. Defaults to None. |
None
|
bearer |
bool
|
Whether the user is a bearer token. Defaults to False. |
False
|
deny_pub |
ty.Optional[ty.List[str]]
|
List of denied publication subjects. Defaults to None. |
None
|
deny_pubsub |
ty.Optional[ty.List[str]]
|
List of denied publication and subscription subjects. Defaults to None. |
None
|
deny_sub |
ty.Optional[ty.List[str]]
|
List of denied subscription subjects. Defaults to None. |
None
|
expiry |
ty.Optional[timedelta]
|
Expiry of the user token. Defaults to None (does not expire). |
None
|
response_ttl |
ty.Optional[timedelta]
|
Response TTL. Defaults to None. |
None
|
source_networks |
ty.Optional[ty.List[str]]
|
Allowed source networks. Defaults to None (all allowed). |
None
|
start |
ty.Union[timedelta, datetime, None]
|
Datetime, or timedelta from now, when the token is valid from. Defaults to None (now). |
None
|
tag |
ty.Optional[ty.List[str]]
|
List of tags. Defaults to None. |
None
|
Raises:
Type | Description |
---|---|
ValueError
|
Invalid parameters. |
Returns:
Name | Type | Description |
---|---|---|
User |
User
|
User object. |
Source code in src/nats_nsc/create_user.py
def create_user(user_name: str, account: Account,
pub_key: str, *, jwt_id: ty.Optional[str] = None,
allow_pub: ty.Optional[ty.List[str]] = None,
allow_pub_response: ty.Optional[int] = None,
allow_pubsub: ty.Optional[ty.List[str]] = None,
allow_sub: ty.Optional[ty.List[str]] = None,
bearer: bool = False,
deny_pub: ty.Optional[ty.List[str]] = None,
deny_pubsub: ty.Optional[ty.List[str]] = None,
deny_sub: ty.Optional[ty.List[str]] = None,
expiry: ty.Optional[timedelta] = None,
response_ttl: ty.Optional[timedelta] = None,
source_networks: ty.Optional[ty.List[str]] = None,
start: ty.Union[timedelta, datetime, None] = None,
tag: ty.Optional[ty.List[str]] = None) -> User:
"""Create user.
Args:
user_name (str): Name of the user.
account (Account): Account to create the user for.
pub_key (str): Public key of the user.
jwt_id (ty.Optional[str], optional): JWT identifier. If not provided, a random UUID is generated.
allow_pub (ty.Optional[ty.List[str]], optional): List of allowed publication subjects. Defaults to None.
allow_pub_response (ty.Optional[int], optional): Number of responses allowed for each publication. Defaults to 1.
allow_pubsub (ty.Optional[ty.List[str]], optional): List of allowed publication and subscription subjects. Defaults to None.
allow_sub (ty.Optional[ty.List[str]], optional): List of allowed subscription subjects. Defaults to None.
bearer (bool, optional): Whether the user is a bearer token. Defaults to False.
deny_pub (ty.Optional[ty.List[str]], optional): List of denied publication subjects. Defaults to None.
deny_pubsub (ty.Optional[ty.List[str]], optional): List of denied publication and subscription subjects. Defaults to None.
deny_sub (ty.Optional[ty.List[str]], optional): List of denied subscription subjects. Defaults to None.
expiry (ty.Optional[timedelta], optional): Expiry of the user token. Defaults to None (does not expire).
response_ttl (ty.Optional[timedelta], optional): Response TTL. Defaults to None.
source_networks (ty.Optional[ty.List[str]], optional): Allowed source networks. Defaults to None (all allowed).
start (ty.Union[timedelta, datetime, None], optional): Datetime, or timedelta from now, when the token is valid from. Defaults to None (now).
tag (ty.Optional[ty.List[str]], optional): List of tags. Defaults to None.
Raises:
ValueError: Invalid parameters.
Returns:
User: User object.
""" # noqa: 501
if not account.has_key:
raise ValueError('Account has no key')
issued_at = start if isinstance(start, datetime) else datetime.utcnow()
if isinstance(start, timedelta):
issued_at += start
pub = account.pub_permissions.as_dict()
if allow_pub is None or allow_pubsub is None:
pub['allow'] = []
if allow_pub is not None:
pub['allow'] += allow_pub
if allow_pubsub is not None:
pub['allow'] += allow_pubsub
if deny_pub is not None or deny_pubsub is not None:
pub['deny'] = []
if deny_pub is not None:
pub['deny'] += deny_pub
if deny_pubsub is not None:
pub['deny'] += deny_pubsub
sub = account.sub_permissions.as_dict()
if allow_sub is not None or allow_pubsub is not None:
sub['allow'] = []
if allow_sub is not None:
sub['allow'] += allow_sub
if allow_pubsub is not None:
sub['allow'] += allow_pubsub
if deny_sub is not None or deny_pubsub is not None:
sub['deny'] = []
if deny_sub is not None:
sub['deny'] += deny_sub
if deny_pubsub is not None:
sub['deny'] += deny_pubsub
resp = None
if allow_pub_response is not None or response_ttl is not None:
if allow_pub_response is None:
allow_pub_response = 0 # Yea, I don't know why either, but that's how nsc works
if response_ttl is None:
response_ttl = timedelta(seconds=0)
resp = {
'max': allow_pub_response,
'ttl': response_ttl.total_seconds() * TTL_SCALE
}
payload = {
'jti': uuid.uuid4().hex if jwt_id is None else jwt_id,
'iat': int(issued_at.timestamp()),
'iss': account.pub_key,
'name': user_name,
'sub': pub_key,
'nats': {
'sub': sub,
'pub': pub,
"subs": account.limits.subs,
"data": account.limits.data,
"payload": account.limits.payload,
"type": "user",
"version": 2
}
}
if expiry is not None:
payload['exp'] = int((issued_at + expiry).timestamp())
if resp is not None:
payload['nats']['resp'] = resp
if source_networks:
payload['nats']['src'] = source_networks
if bearer:
payload['nats']['bearer_token'] = True
if tag:
payload['nats']['tags'] = tag
to_sign = base64.urlsafe_b64encode(json.dumps(HEADER).encode()).strip(b'=') + b'.' +\
base64.urlsafe_b64encode(json.dumps(payload).encode()).strip(b'=')
user = nkeys.from_seed(account.priv_key.encode()) # type: ignore
sig = user.sign(to_sign)
user.wipe()
jwt = to_sign + b'.' + base64.urlsafe_b64encode(sig).strip(b'=')
return User(jwt_token=jwt.decode())