1
    2import pytest
    3 
    4import auth_utils
    5 
    6 
    7@pytest.mark.pgsql('auth', files=['test_data.sql'])
    8async def test_authenticate_base(service_client):
    9    response = await service_client.get('/v1/hello')
   10    assert response.status == 401
   11 
   12    authentication_header = response.headers['WWW-Authenticate']
   13    auth_directives = auth_utils.parse_directives(authentication_header)
   14 
   15    auth_utils.auth_directives_assert(auth_directives)
   16 
   17    challenge = auth_utils.construct_challenge(auth_directives)
   18    auth_header = auth_utils.construct_header('username', 'pswd', challenge)
   19 
   20    response = await service_client.get(
   21        '/v1/hello', headers={'Authorization': auth_header},
   22    )
   23    assert response.status == 200
   24    assert 'Authentication-Info' in response.headers
   25 
   26 
   27
   28 
   29 
   30@pytest.mark.pgsql('auth', files=['test_data.sql'])
   31async def test_authenticate_base_unregistered_user(service_client):
   32    response = await service_client.get('/v1/hello')
   33    assert response.status == 401
   34 
   35    authentication_header = response.headers['WWW-Authenticate']
   36    auth_directives = auth_utils.parse_directives(authentication_header)
   37 
   38    auth_utils.auth_directives_assert(auth_directives)
   39 
   40    challenge = auth_utils.construct_challenge(auth_directives)
   41    auth_header = auth_utils.construct_header(
   42        'unregistered_username', 'pswd', challenge,
   43    )
   44 
   45    response = await service_client.get(
   46        '/v1/hello', headers={'Authorization': auth_header},
   47    )
   48    assert response.status == 403
   49 
   50 
   51@pytest.mark.pgsql('auth', files=['test_data.sql'])
   52async def test_postgres_wrong_data(service_client):
   53    response = await service_client.get('/v1/hello')
   54    assert response.status == 401
   55 
   56    authentication_header = response.headers['WWW-Authenticate']
   57    auth_directives = auth_utils.parse_directives(authentication_header)
   58 
   59    auth_utils.auth_directives_assert(auth_directives)
   60 
   61    challenge = auth_utils.construct_challenge(auth_directives)
   62    auth_header = auth_utils.construct_header(
   63        'username', 'wrong-password', challenge,
   64    )
   65 
   66    response = await service_client.get(
   67        '/v1/hello', headers={'Authorization': auth_header},
   68    )
   69    assert response.status == 401
   70    assert 'WWW-Authenticate' in response.headers
   71 
   72 
   73@pytest.mark.pgsql('auth', files=['test_data.sql'])
   74async def test_repeated_auth(service_client):
   75    response = await service_client.get('/v1/hello')
   76    assert response.status == 401
   77 
   78    authentication_header = response.headers['WWW-Authenticate']
   79    auth_directives = auth_utils.parse_directives(authentication_header)
   80 
   81    auth_utils.auth_directives_assert(auth_directives)
   82 
   83    challenge = auth_utils.construct_challenge(auth_directives)
   84    auth_header = auth_utils.construct_header('username', 'pswd', challenge)
   85 
   86    response = await service_client.get(
   87        '/v1/hello', headers={'Authorization': auth_header},
   88    )
   89    assert response.status == 200
   90 
   91    auth_info_header = response.headers['Authentication-Info']
   92    auth_directives_info = auth_utils.parse_directives(auth_info_header)
   93 
   94    challenge = auth_utils.construct_challenge(
   95        auth_directives, auth_directives_info['nextnonce'],
   96    )
   97    auth_header = auth_utils.construct_header('username', 'pswd', challenge)
   98 
   99    response = await service_client.get(
  100        '/v1/hello', headers={'Authorization': auth_header},
  101    )
  102    assert response.status == 200
  103    assert 'Authentication-Info' in response.headers
  104 
  105 
  106@pytest.mark.pgsql('auth', files=['test_data.sql'])
  107async def test_same_nonce_repeated_use(service_client):
  108    response = await service_client.get('/v1/hello')
  109    assert response.status == 401
  110 
  111    authentication_header = response.headers['WWW-Authenticate']
  112    auth_directives = auth_utils.parse_directives(authentication_header)
  113 
  114    auth_utils.auth_directives_assert(auth_directives)
  115 
  116    challenge = auth_utils.construct_challenge(auth_directives)
  117    auth_header = auth_utils.construct_header('username', 'pswd', challenge)
  118 
  119    response = await service_client.get(
  120        '/v1/hello', headers={'Authorization': auth_header},
  121    )
  122    assert response.status == 200
  123 
  124    auth_info_header = response.headers['Authentication-Info']
  125    auth_directives_info = auth_utils.parse_directives(auth_info_header)
  126 
  127    challenge = auth_utils.construct_challenge(
  128        auth_directives, auth_directives_info['nextnonce'],
  129    )
  130    auth_header = auth_utils.construct_header('username', 'pswd', challenge)
  131 
  132    response = await service_client.get(
  133        '/v1/hello', headers={'Authorization': auth_header},
  134    )
  135    assert response.status == 200
  136 
  137    response = await service_client.get(
  138        '/v1/hello', headers={'Authorization': auth_header},
  139    )
  140    assert response.status == 401
  141    assert 'WWW-Authenticate' in response.headers
  142 
  143 
  144@pytest.mark.pgsql('auth', files=['test_data.sql'])
  145async def test_expiring_nonce(service_client, mocked_time):
  146    response = await service_client.get('/v1/hello')
  147    assert response.status == 401
  148 
  149    authentication_header = response.headers['WWW-Authenticate']
  150    auth_directives = auth_utils.parse_directives(authentication_header)
  151 
  152    auth_utils.auth_directives_assert(auth_directives)
  153 
  154    challenge = auth_utils.construct_challenge(auth_directives)
  155    auth_header = auth_utils.construct_header('username', 'pswd', challenge)
  156 
  157    response = await service_client.get(
  158        '/v1/hello', headers={'Authorization': auth_header},
  159    )
  160    assert response.status == 200
  161 
  162    very_long_waiting_ms = 1500
  163    mocked_time.sleep(very_long_waiting_ms)
  164 
  165    auth_info_header = response.headers['Authentication-Info']
  166    auth_directives_info = auth_utils.parse_directives(auth_info_header)
  167 
  168    challenge = auth_utils.construct_challenge(
  169        auth_directives, auth_directives_info['nextnonce'],
  170    )
  171    auth_header = auth_utils.construct_header('username', 'pswd', challenge)
  172 
  173    response = await service_client.get(
  174        '/v1/hello', headers={'Authorization': auth_header},
  175    )
  176    assert response.status == 401
  177 
  178    authentication_header = response.headers['WWW-Authenticate']
  179    auth_directives = auth_utils.parse_directives(authentication_header)
  180 
  181    auth_utils.auth_directives_assert(auth_directives)
  182 
  183    challenge = auth_utils.construct_challenge(auth_directives)
  184    auth_header = auth_utils.construct_header('username', 'pswd', challenge)
  185 
  186    response = await service_client.get(
  187        '/v1/hello', headers={'Authorization': auth_header},
  188    )
  189    assert response.status == 200
  190    assert 'Authentication-Info' in response.headers
  191 
  192 
  193@pytest.mark.pgsql('auth', files=['test_data.sql'])
  194async def test_aliving_nonce_after_half_ttl(service_client, mocked_time):
  195    response = await service_client.get('/v1/hello')
  196    assert response.status == 401
  197 
  198    authentication_header = response.headers['WWW-Authenticate']
  199    auth_directives = auth_utils.parse_directives(authentication_header)
  200 
  201    auth_utils.auth_directives_assert(auth_directives)
  202 
  203    challenge = auth_utils.construct_challenge(auth_directives)
  204    auth_header = auth_utils.construct_header('username', 'pswd', challenge)
  205 
  206    response = await service_client.get(
  207        '/v1/hello', headers={'Authorization': auth_header},
  208    )
  209    assert response.status == 200
  210 
  211    short_waiting_ms = 500
  212    mocked_time.sleep(short_waiting_ms)
  213 
  214    auth_info_header = response.headers['Authentication-Info']
  215    auth_directives_info = auth_utils.parse_directives(auth_info_header)
  216 
  217    challenge = auth_utils.construct_challenge(
  218        auth_directives, auth_directives_info['nextnonce'],
  219    )
  220    auth_header = auth_utils.construct_header('username', 'pswd', challenge)
  221 
  222    response = await service_client.get(
  223        '/v1/hello', headers={'Authorization': auth_header},
  224    )
  225    assert response.status == 200
  226    assert 'Authentication-Info' in response.headers
  227 
  228 
  229@pytest.mark.pgsql('auth', files=['test_data.sql'])
  230async def test_repeated_auth_ignore_nextnonce(service_client):
  231    response = await service_client.get('/v1/hello')
  232    assert response.status == 401
  233 
  234    authentication_header = response.headers['WWW-Authenticate']
  235    auth_directives = auth_utils.parse_directives(authentication_header)
  236 
  237    auth_utils.auth_directives_assert(auth_directives)
  238 
  239    challenge = auth_utils.construct_challenge(auth_directives)
  240    auth_header = auth_utils.construct_header('username', 'pswd', challenge)
  241 
  242    response = await service_client.get(
  243        '/v1/hello', headers={'Authorization': auth_header},
  244    )
  245    assert response.status == 200
  246 
  247    response = await service_client.get('/v1/hello')
  248    assert response.status == 401
  249 
  250    authentication_header = response.headers['WWW-Authenticate']
  251    auth_directives = auth_utils.parse_directives(authentication_header)
  252 
  253    challenge = auth_utils.construct_challenge(auth_directives)
  254    auth_header = auth_utils.construct_header('username', 'pswd', challenge)
  255 
  256    response = await service_client.get(
  257        '/v1/hello', headers={'Authorization': auth_header},
  258    )
  259    assert response.status == 200