3
3
import requests
4
4
from time import sleep
5
5
6
- from tests .utils import get_frontend_url , wait_for_healthcheck_util
6
+ from tests .utils import get_frontend_url , wait_for_healthcheck_util , get_admin_credentials
7
+ from datahub .cli .ingest_cli import get_session_and_host
7
8
8
9
# Disable telemetry
9
10
os .putenv ("DATAHUB_TELEMETRY_ENABLED" , "false" )
10
11
12
+ (admin_user , admin_pass ) = get_admin_credentials ()
11
13
12
14
@pytest .fixture (scope = "session" )
13
15
def wait_for_healthchecks ():
@@ -23,7 +25,7 @@ def test_healthchecks(wait_for_healthchecks):
23
25
@pytest .fixture (scope = 'class' , autouse = True )
24
26
def custom_user_setup ():
25
27
"""Fixture to execute setup before and tear down after all tests are run"""
26
- admin_session = loginAs ("datahub" , "datahub" )
28
+ admin_session = loginAs (admin_user , admin_pass )
27
29
28
30
res_data = removeUser (admin_session , "urn:li:corpuser:user" )
29
31
assert res_data
@@ -66,7 +68,7 @@ def custom_user_setup():
66
68
67
69
# signUp will override the session cookie to the new user to be signed up.
68
70
admin_session .cookies .clear ()
69
- admin_session = loginAs ("datahub" , "datahub" )
71
+ admin_session = loginAs (admin_user , admin_pass )
70
72
71
73
# Make user created user is there.
72
74
res_data = listUsers (admin_session )
@@ -94,7 +96,7 @@ def custom_user_setup():
94
96
@pytest .fixture (autouse = True )
95
97
def access_token_setup ():
96
98
"""Fixture to execute asserts before and after a test is run"""
97
- admin_session = loginAs ("datahub" , "datahub" )
99
+ admin_session = loginAs (admin_user , admin_pass )
98
100
99
101
res_data = listAccessTokens (admin_session )
100
102
assert res_data
@@ -114,7 +116,7 @@ def access_token_setup():
114
116
115
117
@pytest .mark .dependency (depends = ["test_healthchecks" ])
116
118
def test_admin_can_create_list_and_revoke_tokens (wait_for_healthchecks ):
117
- admin_session = loginAs ("datahub" , "datahub" )
119
+ admin_session = loginAs (admin_user , admin_pass )
118
120
119
121
# Using a super account, there should be no tokens
120
122
res_data = listAccessTokens (admin_session )
@@ -124,12 +126,12 @@ def test_admin_can_create_list_and_revoke_tokens(wait_for_healthchecks):
124
126
assert len (res_data ["data" ]["listAccessTokens" ]["tokens" ]) == 0
125
127
126
128
# Using a super account, generate a token for itself.
127
- res_data = generateAccessToken_v2 (admin_session , "urn:li:corpuser:datahub " )
129
+ res_data = generateAccessToken_v2 (admin_session , f "urn:li:corpuser:{ admin_user } " )
128
130
assert res_data
129
131
assert res_data ["data" ]
130
132
assert res_data ["data" ]["createAccessToken" ]
131
133
assert res_data ["data" ]["createAccessToken" ]["accessToken" ]
132
- assert res_data ["data" ]["createAccessToken" ]["metadata" ]["actorUrn" ] == "urn:li:corpuser:datahub "
134
+ assert res_data ["data" ]["createAccessToken" ]["metadata" ]["actorUrn" ] == f "urn:li:corpuser:{ admin_user } "
133
135
admin_tokenId = res_data ["data" ]["createAccessToken" ]["metadata" ]["id" ]
134
136
# Sleep for eventual consistency
135
137
sleep (3 )
@@ -140,8 +142,8 @@ def test_admin_can_create_list_and_revoke_tokens(wait_for_healthchecks):
140
142
assert res_data ["data" ]
141
143
assert res_data ["data" ]["listAccessTokens" ]["total" ] is not None
142
144
assert len (res_data ["data" ]["listAccessTokens" ]["tokens" ]) == 1
143
- assert res_data ["data" ]["listAccessTokens" ]["tokens" ][0 ]["actorUrn" ] == "urn:li:corpuser:datahub "
144
- assert res_data ["data" ]["listAccessTokens" ]["tokens" ][0 ]["ownerUrn" ] == "urn:li:corpuser:datahub "
145
+ assert res_data ["data" ]["listAccessTokens" ]["tokens" ][0 ]["actorUrn" ] == f "urn:li:corpuser:{ admin_user } "
146
+ assert res_data ["data" ]["listAccessTokens" ]["tokens" ][0 ]["ownerUrn" ] == f "urn:li:corpuser:{ admin_user } "
145
147
146
148
# Check that the super account can revoke tokens that it created
147
149
res_data = revokeAccessToken (admin_session , admin_tokenId )
@@ -161,7 +163,7 @@ def test_admin_can_create_list_and_revoke_tokens(wait_for_healthchecks):
161
163
162
164
@pytest .mark .dependency (depends = ["test_healthchecks" ])
163
165
def test_admin_can_create_and_revoke_tokens_for_other_user (wait_for_healthchecks ):
164
- admin_session = loginAs ("datahub" , "datahub" )
166
+ admin_session = loginAs (admin_user , admin_pass )
165
167
166
168
# Using a super account, there should be no tokens
167
169
res_data = listAccessTokens (admin_session )
@@ -188,7 +190,7 @@ def test_admin_can_create_and_revoke_tokens_for_other_user(wait_for_healthchecks
188
190
assert res_data ["data" ]["listAccessTokens" ]["total" ] is not None
189
191
assert len (res_data ["data" ]["listAccessTokens" ]["tokens" ]) == 1
190
192
assert res_data ["data" ]["listAccessTokens" ]["tokens" ][0 ]["actorUrn" ] == "urn:li:corpuser:user"
191
- assert res_data ["data" ]["listAccessTokens" ]["tokens" ][0 ]["ownerUrn" ] == "urn:li:corpuser:datahub "
193
+ assert res_data ["data" ]["listAccessTokens" ]["tokens" ][0 ]["ownerUrn" ] == f "urn:li:corpuser:{ admin_user } "
192
194
193
195
# Check that the super account can revoke tokens that it created for another user
194
196
res_data = revokeAccessToken (admin_session , user_tokenId )
@@ -249,7 +251,7 @@ def test_non_admin_can_create_list_revoke_tokens(wait_for_healthchecks):
249
251
250
252
@pytest .mark .dependency (depends = ["test_healthchecks" ])
251
253
def test_admin_can_manage_tokens_generated_by_other_user (wait_for_healthchecks ):
252
- admin_session = loginAs ("datahub" , "datahub" )
254
+ admin_session = loginAs (admin_user , admin_pass )
253
255
254
256
# Using a super account, there should be no tokens
255
257
res_data = listAccessTokens (admin_session )
@@ -273,7 +275,7 @@ def test_admin_can_manage_tokens_generated_by_other_user(wait_for_healthchecks):
273
275
274
276
# Admin should be able to list other tokens
275
277
user_session .cookies .clear ()
276
- admin_session = loginAs ("datahub" , "datahub" )
278
+ admin_session = loginAs (admin_user , admin_pass )
277
279
res_data = listAccessTokens (admin_session , [{"field" : "ownerUrn" ,"value" : "urn:li:corpuser:user" }])
278
280
assert res_data
279
281
assert res_data ["data" ]
@@ -285,7 +287,7 @@ def test_admin_can_manage_tokens_generated_by_other_user(wait_for_healthchecks):
285
287
286
288
# Admin can delete token created by someone else.
287
289
admin_session .cookies .clear ()
288
- admin_session = loginAs ("datahub" , "datahub" )
290
+ admin_session = loginAs (admin_user , admin_pass )
289
291
res_data = revokeAccessToken (admin_session , user_tokenId )
290
292
assert res_data
291
293
assert res_data ["data" ]
@@ -304,7 +306,7 @@ def test_admin_can_manage_tokens_generated_by_other_user(wait_for_healthchecks):
304
306
assert len (res_data ["data" ]["listAccessTokens" ]["tokens" ]) == 0
305
307
306
308
# Using the super account, check that all tokens where removed.
307
- admin_session = loginAs ("datahub" , "datahub" )
309
+ admin_session = loginAs (admin_user , admin_pass )
308
310
res_data = listAccessTokens (admin_session , [{"field" : "ownerUrn" ,"value" : "urn:li:corpuser:user" }])
309
311
assert res_data
310
312
assert res_data ["data" ]
@@ -315,7 +317,7 @@ def test_admin_can_manage_tokens_generated_by_other_user(wait_for_healthchecks):
315
317
def test_non_admin_can_not_generate_tokens_for_others (wait_for_healthchecks ):
316
318
user_session = loginAs ("user" , "user" )
317
319
# Normal user should not be able to generate token for another user
318
- res_data = generateAccessToken_v2 (user_session , "urn:li:corpuser:datahub " )
320
+ res_data = generateAccessToken_v2 (user_session , f "urn:li:corpuser:{ admin_user } " )
319
321
assert res_data
320
322
assert res_data ["errors" ]
321
323
assert res_data ["errors" ][0 ]["message" ] == "Unauthorized to perform this action. Please contact your DataHub administrator."
0 commit comments