[file system] Configurable token delegation process to enable other authentication methods in S3 credential provider chain#1245
Conversation
f3981d3 to
92ba1e2
Compare
| @@ -59,13 +63,8 @@ public static void updateHadoopConfig(org.apache.hadoop.conf.Configuration hadoo | |||
| LOG.debug("Provider already exists"); | |||
| } | |||
|
|
|||
| // then, set addition info | |||
| if (additionInfos == null) { | |||
There was a problem hiding this comment.
How do we actually ensure here that there is no race condition? i.e., that additionInfos is set before this method is called?
There was a problem hiding this comment.
What do you mean race condition? additionInfos may be set before or after this method is called.
There was a problem hiding this comment.
sorry, not the entire code block i want to show is visible in the comment.
this code block is from com.alibaba.fluss.fs.s3.token.S3DelegationTokenReceiver#updateHadoopConfig and is only called in file system creation. it expects additionInfos to be set. if not, an exception is thrown.
if (additionInfos == null) {
// if addition info is null, it also means we have not received any token,
// we throw InvalidCredentialsException
throw new NoAwsCredentialsException(DynamicTemporaryAWSCredentialsProvider.COMPONENT);
} else {
for (Map.Entry<String, String> entry : additionInfos.entrySet()) {
hadoopConfig.set(entry.getKey(), entry.getValue());
}
}
however, additionInfos is only written to in com.alibaba.fluss.fs.s3.token.S3DelegationTokenReceiver#onNewTokensObtained() which is only called by SecurityTokenReceiverRepository.
- how do we ensure that
com.alibaba.fluss.fs.s3.token.S3DelegationTokenReceiver#onNewTokensObtained()was called, so we do not end up throwing an exception during file system creation? - couldn't we just move this code block
for (Map.Entry<String, String> entry : additionInfos.entrySet()) {
hadoopConfig.set(entry.getKey(), entry.getValue());
into com.alibaba.fluss.fs.s3.token.S3DelegationTokenReceiver#onNewTokensObtained() and remove it in com.alibaba.fluss.fs.s3.token.S3DelegationTokenReceiver#updateHadoopConfig?
There was a problem hiding this comment.
1: Yes, you are right. it may happen that it may be end up throwing an exception. But I think it happens rarely. If it happens, it can always retry, as a flink job. In the firstly version, I block it to wait until the token is received. But Jark think it's hard to debug if it hangs whiling obtaining token. So, in here, I just throw the exception
2: I'm not sure how it can solve the problem. The Filesystem will call updateHadoopConfig to init a filesytem, even though we move the code block into onNewTokensObtained, but it may still happen that the token is not avaiable, and we still can't init the filsystem.
| PropertiesUtils.extractAndRemovePrefix(flinkConfig.toMap(), FLUSS_PREFIX) | ||
| .forEach(flussConfig::setString); | ||
| } catch (NoSuchMethodError e) { | ||
| // Flink 1.18 does not have the toMap() method yet, see |
There was a problem hiding this comment.
I added a note in the documentation for this.
92ba1e2 to
0e29681
Compare
|
|
||
| byte[] tokenBytes = token.getToken(); | ||
| if (tokenBytes.length != 0) { | ||
| credentials = CredentialsJsonSerde.fromJson(tokenBytes); |
There was a problem hiding this comment.
shouldn't we lock on credentials and additionalInfos to be safe? both fields are accessed from separate threads (one reads, the other one writes) and assignment may not be a single machine level instruction.
cb001e7 to
2c7731b
Compare
d88c76c to
434a4f4
Compare
|
Do you have a planned timeline for the release date of this feature? In 0.8, S3 can only be used with baked-in permanent credentials, which, I assume, prevents most AWS customers using Fluss in production. |
a589ea8 to
2e4c770
Compare
|
In general, this PR is ready for review/testing. We need to figure out 2 things.
The idea of the PR is to deactivate token delegation to be able to use other credential providers. Without token delegation, users should be forced to set the credential provider to avoid misconfiguration or use unsafe credential providers that use long-term credentials. A more detailed description is in the update docs of the PR. To force users to set the credential provider when token delegation is deactivated, I set the credential provider config options to blank, see here. If you check out the PR and follow the instructions under Now the anomaly: Shut down the Docker Compose stack. Remove |
|
@michaelkoepf @luoyuxia I tested it out, and it works. But then I tried to enable both lake tiering. Can someone test if you can make it work with both? |
I might be missing something but the documented behaviour of unsetting Were
Additional question that might help my understanding, what do we expect the user experience to look like here when they disable token delegation and forgot to set credential provider? |
|
There might be more, but for me it boils down to two things... Currently when configuring Fluss tiering to work with minio there is this exception: So my understanding is that when you run against MinIO, you typically use static S3 credentials and a custom endpoint. That works fine with Hadoop S3A if you set the right configs. Fluss’s So I think @michaelkoepf solution should work.. If someone has time to try out different scenarios and see the behavior with some stronger understanding than me, it would be amazing. The second thing to test and figure out is this: #1911 (comment) |
|
@leekeiabstraction very strong catch.
yes, that was the manual test scenario: "but leave the credentials (access key, secret) there". verification check, kind of. but i did miss out on the
we would have treated this as a configuration error, which causes s3 access to fail. this mechanism was introduced as a safety net. users should prefer short-term credentials, as we discourage embedding long-term credentials in the client. if they want to use long-term credentials in the client, they should be explicit about it by setting the respective credential provider. any ideas how we can achieve this despite the behavior of |
I may need to sync to understand the user experience that we want in terms of happy cases as well. But if you intend to support this specific scenario ie user should run into error when they disable token delegation without specifying 's3.aws.credentials.provider', my suggestion is that we should check the configurations and fail-fast by throwing an exception at the configuration stage, rather than passing it on and relying on dependency's behaviour. Also, modifying credentials provider behind the scene might catch user by surprise. |
i think this was considered in the beginning. the problem there was that deactivating the token delegation ( bottom line: i think the easiest way is to just leave default providers in place, if possible. this would need to be further investigated, because config options are also used to figure out whether we are on the server or client side (see here). but actually, i think checking if so i suggest, if it is possible to leave the credential providers in place, we put a warning in the docs that short-term credentials should be preferred if possible. wdyt based on your experience with hadoop aws in other projects? @leekeiabstraction |
|
Hey @michaelkoepf
I've not used TD in the past so taking my time to understand the scenarios here. Warning in logs and also documentation seems sensible to me but before recommending that, I'd like to understand the fuller picture as there are a few components that I do not have context on: MinIO and when/where configurations are defined. Can you elaborate the following? Happy to sync on huddle S3
MinIO
|
- Extract *and* remove client filesystem prefix - Pass through 'fluss.' options from Flink config - Adapt S3 file system implementation for new config option
2e4c770 to
cf8ad90
Compare
01cc836 to
1423313
Compare
| // set credential provider | ||
| setCredentialProvider(hadoopConfig); | ||
| final boolean isClient = isClient(flussConfig); | ||
| final boolean useTokenDelegation; |
There was a problem hiding this comment.
Rename from useTokenDelegation to shouldServeToken or shouldDelegateToken as technically the client side still uses token delegation if server is configured to perform token delegation.
|
|
||
| // pass through all fluss options from flink config | ||
| try { | ||
| PropertiesUtils.extractAndRemovePrefix(flinkConfig.toMap(), FLUSS_PREFIX) |
There was a problem hiding this comment.
Line 243 to 248 already forward table configs with prefix client to fluss config.
This will remove any table specific config if the name clashes. In such cases, should table specific config take precedence over flink config?
| // pass through all fluss options from flink config | ||
| try { | ||
| PropertiesUtils.extractAndRemovePrefix(flinkConfig.toMap(), FLUSS_PREFIX) | ||
| .forEach(flussConfig::setString); |
There was a problem hiding this comment.
Should we enforce/prependclient. prefix here? Technically, all config in FlinkTableFactory is Fluss client side config. As is, we are leaking config non client config through if user missed out the client part.. I believe that leads to the best effort handling logic that we have within isClient(configuration)
| } | ||
|
|
||
| String flussKeyClientPrefix = CLIENT_PREFIX + flussPrefix; | ||
| if (flussKey.startsWith(flussKeyClientPrefix)) { |
There was a problem hiding this comment.
Suggest the following to avoid unnecessary alloc and comparison.
else if (flussKey.startsWith(CLIENT_PREFIX + flussPrefix)) {| Collections.reverse(credentialProviderPrependOrder); | ||
|
|
||
| for (String credentialProviderName : credentialProviderPrependOrder) { | ||
| if (!providers.contains(credentialProviderName)) { |
There was a problem hiding this comment.
We should use exact match instead of contains.
There was a problem hiding this comment.
Also, if I'm reading correctly, we currently only add a single credential provider Collections.singletonList(DynamicTemporaryAWSCredentialsProvider.NAME) from S3FileSystemPlugin.java:102. Do we need to handle updating and adding a list of credentialProviders anywhere? If not, it might be worth simplifying the code and remove the updating/adding of list of credentialProviders
Unless you force the containers to recreate, they will keep using the old configuration. That's why you can still write to MinIO. |
|
subsumed by #2662 |
Purpose
Linked issue: close #1246
Brief change log
Tests
Unit Tests
Added
fluss-filesystems/fluss-fs-s3/src/test/java/com/alibaba/fluss/fs/s3/S3FileSystemPluginTest.javafluss-filesystems/fluss-fs-s3/src/test/java/com/alibaba/fluss/fs/s3/token/S3DelegationTokenReceiverTest.javaExample usage with MinIO
Checkout PR and build using
./mvnw clean package -DskipTests.dockerdirectory: Build fluss image with namefluss:local-snapshot. Execute theprepare.shfor the quickstart image, copy the localfluss-fs-s3from this PR into thelibfolder and build the image with namefluss-quickstart-flink:local-snapshot.docker-compose.ymlStart with
docker compose up -d.Notes:
Since we set
fs.s3.enable-token-delegation: false, the token delegation process is deactivated, and there will be no exception thrown during obtaining the session token via STS.There is no need to set
region,endpointorpath-style-accesson the client side, because the information will be distributed from the server to the client even when token delegation is deactivated.docker compose exec jobmanager ./sql-clientBased on quickstart guide, create catalog and table, and insert data.
Go to
http://localhost:9001. Log in with userflussand password12345678.Wait until
fluss/remote-datacontains thelogfolder, i.e., logs are tiered so we can be sure that we read from MinIO.SELECT * FROM fluss_order;should successfully read from MinIO.API and Format
n/a
Documentation
fluss-docs/docs/maintenance/filesystems/s3