当前位置: 代码迷 >> 综合 >> Springboot-Cassandra list<frozen<UDT>>类型使用和异常问题处理
  详细解决方案

Springboot-Cassandra list<frozen<UDT>>类型使用和异常问题处理

热度:91   发布时间:2024-01-30 22:07:01.0

最新在开发的项目中使用到了Cassandra的udt(user-defined-type)类型, 遇到了不少棘手问题,本文将列举遇到的异常和解决方案。

表结构定义如下

CREATE TYPE cass_stdy.address (addr text,mail text
);
CREATE TABLE cass_stdy.user (id uuid PRIMARY KEY,addr list<frozen<address>>,age int,birthday timestamp,d_addr frozen<address>,name text,score map<text, text>
);

cass_stdy.user表中,addr字段即为本文用到的重点,list<frozen<address>>

Cassandra中,封装类型的UDT类型,必须使用frozen,而frozen的字段将以blob形式存储在硬盘中。

创建非frozen的封装UDT类型提示,非frozen的UDT类型不允许作为集合的内部类型使用:

InvalidRequest: Error from server: code=2200 [Invalid query] message="Non-frozen UDTs are not allowed inside collections: list<address>"

Springboot配置如下:

spring.data.cassandra.keyspace-name=cass_stdy
spring.data.cassandra.contact-points=127.0.0.1
spring.data.cassandra.port=9042
spring.data.cassandra.local-datacenter=datacenter1
@Configuration
@EnableConfigurationProperties(CassandraProperties.class)
@EnableCassandraRepositories
public class SmvcCassandraConfig {@Autowiredprivate CassandraProperties properties;@Beanpublic CqlSession cqlSession() {List<InetSocketAddress> contactPoints = properties.getContactPoints().stream().map(address ->new InetSocketAddress(address, properties.getPort())).collect(Collectors.toList());return CqlSession.builder().withAuthCredentials(properties.getUsername(), properties.getPassword()).withKeyspace(properties.getKeyspaceName()).addContactPoints(contactPoints).withLocalDatacenter(properties.getLocalDatacenter()).build();}
}

User和Address对象定义:

@Table("user")
public class User {@PrimaryKeyColumn(name = "id", type = PrimaryKeyType.PARTITIONED)private UUID id;@Column("name")private String name;@Column("addr")private List<Address> addresses;@Column("age")private Integer age;@Column("birthday")private Timestamp birthday;// getter ... setter ...
}@UserDefinedType("address")
public class Address{@Column("addr")private String addr;@Column("mail")private String mail;// getter ... setter ...
}

Repository代码如下: 

@Repository
public interface IUserRepository extends CassandraRepository<User, UUID> {}

此时执行:

    @Autowiredprivate IUserRepository userRepository;public User save(User user){return userRepository.save(user);}

查询记录显示, 可以正确保存用户记录。

cqlsh:cass_stdy> SELECT * FROM user;id                                   | addr                                                                   | age  | birthday | d_addr | name     | score
--------------------------------------+------------------------------------------------------------------------+------+----------+--------+----------+-------f7702276-694a-47e1-92cf-312db4101353 | [{addr: 'SZ GD CN', mail: 'sz@gd.cn'}, {addr: 'GZ GD CN', mail: null}] | null |     null |   null | name0000 |  null(1 rows)

但是我们使用list<frozen<address>>的目的是为了使用list集合的追加功能,比如,用户要新增一个地址,此时save操作则会将整个addr的值完全覆盖,而如果想要保留原来的地址,就只能先把记录查出来,再由Java来执行list.add操作,然后save把整个值覆盖。这时在IUserRepository中增加一个update方法

@Repository
public interface IUserRepository extends CassandraRepository<User, UUID> {@Query("UPDATE user SET addr = addr + :addrs WHERE id = :id")void updateAddress(List<Address> addrs, UUID id);
}

这样可以在不需要查询原纪录的情况下,直接追加数据,调用updateAddress,出现了异常:

com.datastax.oss.driver.api.core.type.codec.CodecNotFoundException: Codec not found for requested operation: [null <-> com.*******.sbstdy.entity.Address]

com.datastax.oss.driver.api.core.type.codec.CodecNotFoundException: Codec not found for requested operation: [null <-> com.*******.sbstdy.entity.Address]at com.datastax.oss.driver.internal.core.type.codec.registry.CachingCodecRegistry.createCodec(CachingCodecRegistry.java:639) ~[java-driver-core-4.6.1.jar:na]at com.datastax.oss.driver.internal.core.type.codec.registry.CachingCodecRegistry.createCodec(CachingCodecRegistry.java:558) ~[java-driver-core-4.6.1.jar:na]at com.datastax.oss.driver.internal.core.type.codec.registry.DefaultCodecRegistry$1.load(DefaultCodecRegistry.java:95) ~[java-driver-core-4.6.1.jar:na]at com.datastax.oss.driver.internal.core.type.codec.registry.DefaultCodecRegistry$1.load(DefaultCodecRegistry.java:92) ~[java-driver-core-4.6.1.jar:na]at com.datastax.oss.driver.shaded.guava.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3527) ~[java-driver-shaded-guava-25.1-jre.jar:na]at com.datastax.oss.driver.shaded.guava.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2276) ~[java-driver-shaded-guava-25.1-jre.jar:na]at com.datastax.oss.driver.shaded.guava.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2154) ~[java-driver-shaded-guava-25.1-jre.jar:na]at com.datastax.oss.driver.shaded.guava.common.cache.LocalCache$Segment.get(LocalCache.java:2044) ~[java-driver-shaded-guava-25.1-jre.jar:na]at com.datastax.oss.driver.shaded.guava.common.cache.LocalCache.get(LocalCache.java:3951) ~[java-driver-shaded-guava-25.1-jre.jar:na]at com.datastax.oss.driver.shaded.guava.common.cache.LocalCache.getOrLoad(LocalCache.java:3973) ~[java-driver-shaded-guava-25.1-jre.jar:na]at com.datastax.oss.driver.shaded.guava.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4957) ~[java-driver-shaded-guava-25.1-jre.jar:na]at com.datastax.oss.driver.shaded.guava.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4963) ~[java-driver-shaded-guava-25.1-jre.jar:na]at com.datastax.oss.driver.internal.core.type.codec.registry.DefaultCodecRegistry.getCachedCodec(DefaultCodecRegistry.java:117) ~[java-driver-core-4.6.1.jar:na]at com.datastax.oss.driver.internal.core.type.codec.registry.CachingCodecRegistry.codecFor(CachingCodecRegistry.java:317) ~[java-driver-core-4.6.1.jar:na]at com.datastax.oss.driver.internal.core.type.codec.registry.CachingCodecRegistry.createCodec(CachingCodecRegistry.java:622) ~[java-driver-core-4.6.1.jar:na]at com.datastax.oss.driver.internal.core.type.codec.registry.CachingCodecRegistry.createCodec(CachingCodecRegistry.java:558) ~[java-driver-core-4.6.1.jar:na]at com.datastax.oss.driver.internal.core.type.codec.registry.DefaultCodecRegistry$1.load(DefaultCodecRegistry.java:95) ~[java-driver-core-4.6.1.jar:na]at com.datastax.oss.driver.internal.core.type.codec.registry.DefaultCodecRegistry$1.load(DefaultCodecRegistry.java:92) ~[java-driver-core-4.6.1.jar:na]at com.datastax.oss.driver.shaded.guava.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3527) ~[java-driver-shaded-guava-25.1-jre.jar:na]at com.datastax.oss.driver.shaded.guava.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2276) ~[java-driver-shaded-guava-25.1-jre.jar:na]at com.datastax.oss.driver.shaded.guava.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2154) ~[java-driver-shaded-guava-25.1-jre.jar:na]at com.datastax.oss.driver.shaded.guava.common.cache.LocalCache$Segment.get(LocalCache.java:2044) ~[java-driver-shaded-guava-25.1-jre.jar:na]at com.datastax.oss.driver.shaded.guava.common.cache.LocalCache.get(LocalCache.java:3951) ~[java-driver-shaded-guava-25.1-jre.jar:na]at com.datastax.oss.driver.shaded.guava.common.cache.LocalCache.getOrLoad(LocalCache.java:3973) ~[java-driver-shaded-guava-25.1-jre.jar:na]at com.datastax.oss.driver.shaded.guava.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4957) ~[java-driver-shaded-guava-25.1-jre.jar:na]at com.datastax.oss.driver.shaded.guava.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4963) ~[java-driver-shaded-guava-25.1-jre.jar:na]at com.datastax.oss.driver.internal.core.type.codec.registry.DefaultCodecRegistry.getCachedCodec(DefaultCodecRegistry.java:117) ~[java-driver-core-4.6.1.jar:na]at com.datastax.oss.driver.internal.core.type.codec.registry.CachingCodecRegistry.codecFor(CachingCodecRegistry.java:287) ~[java-driver-core-4.6.1.jar:na]at com.datastax.oss.driver.internal.core.cql.Conversions.encode(Conversions.java:304) ~[java-driver-core-4.6.1.jar:na]at com.datastax.oss.driver.internal.core.cql.Conversions.encode(Conversions.java:263) ~[java-driver-core-4.6.1.jar:na]at com.datastax.oss.driver.internal.core.cql.Conversions.toMessage(Conversions.java:168) ~[java-driver-core-4.6.1.jar:na]at com.datastax.oss.driver.internal.core.cql.CqlRequestHandler.<init>(CqlRequestHandler.java:173) ~[java-driver-core-4.6.1.jar:na]at com.datastax.oss.driver.internal.core.cql.CqlRequestAsyncProcessor.process(CqlRequestAsyncProcessor.java:44) ~[java-driver-core-4.6.1.jar:na]at com.datastax.oss.driver.internal.core.cql.CqlRequestSyncProcessor.process(CqlRequestSyncProcessor.java:54) ~[java-driver-core-4.6.1.jar:na]at com.datastax.oss.driver.internal.core.cql.CqlRequestSyncProcessor.process(CqlRequestSyncProcessor.java:30) ~[java-driver-core-4.6.1.jar:na]at com.datastax.oss.driver.internal.core.session.DefaultSession.execute(DefaultSession.java:230) ~[java-driver-core-4.6.1.jar:na]at com.datastax.oss.driver.api.core.cql.SyncCqlSession.execute(SyncCqlSession.java:53) ~[java-driver-core-4.6.1.jar:na]at org.springframework.data.cassandra.core.cql.CqlTemplate.query(CqlTemplate.java:293) ~[spring-data-cassandra-3.0.0.RELEASE.jar:3.0.0.RELEASE]at org.springframework.data.cassandra.core.cql.CqlTemplate.query(CqlTemplate.java:315) ~[spring-data-cassandra-3.0.0.RELEASE.jar:3.0.0.RELEASE]at org.springframework.data.cassandra.core.CassandraTemplate.select(CassandraTemplate.java:340) ~[spring-data-cassandra-3.0.0.RELEASE.jar:3.0.0.RELEASE]

意思是说Codec找不到,我们知道Java的Codec是编解码器,也就是说,Java对象和cql对象的转换过程出现了问题,找不到映射关系,这里就比较奇怪了,save方法执行的时候为啥没有报错,而到了自己手写的cql时就不行了。

这时有两个思路,可以考虑一下:

第一:手动添加编解码器。

第二:从源码上分析save方法为什么可以编解码,而update方法不能,从中找到突破口。

首先,先从第一个思路入手,Cassandra的配置是可以增加编解码器的,也就是下面代码所示的位置:

@Configuration
@EnableConfigurationProperties(CassandraProperties.class)
@EnableCassandraRepositories
public class SmvcCassandraConfig {@Autowiredprivate CassandraProperties properties;@Beanpublic CqlSession cqlSession() {List<InetSocketAddress> contactPoints = properties.getContactPoints().stream().map(address ->new InetSocketAddress(address, properties.getPort())).collect(Collectors.toList());return CqlSession.builder().withAuthCredentials(properties.getUsername(), properties.getPassword()).withKeyspace(properties.getKeyspaceName()).addContactPoints(contactPoints).withLocalDatacenter(properties.getLocalDatacenter()).addTypeCodecs(...)        // 增加编解码器配置.build();}
}

这里点进去addTypeCodecs方法看,入参是TypeCodec类型的可变参数,而TypeCodec是一个带JavaType的泛型接口,

public interface TypeCodec<JavaTypeT> {...

这里可以写一个内部类实现类,根据开发工具提示,需要实现的方法如下:

    @Beanpublic CqlSession cqlSession() {List<InetSocketAddress> contactPoints = properties.getContactPoints().stream().map(address ->new InetSocketAddress(address, properties.getPort())).collect(Collectors.toList());return CqlSession.builder().withAuthCredentials(properties.getUsername(), properties.getPassword()).withKeyspace(properties.getKeyspaceName()).addContactPoints(contactPoints).withLocalDatacenter(properties.getLocalDatacenter()).addTypeCodecs(new TypeCodec<Address>() {@NonNull@Overridepublic GenericType<Address> getJavaType() { return null; }@NonNull@Overridepublic DataType getCqlType() { return null; }@Nullable@Overridepublic ByteBuffer encode(@Nullable Address value, @NonNull ProtocolVersion protocolVersion) { return null; }@Nullable@Overridepublic Address decode(@Nullable ByteBuffer bytes, @NonNull ProtocolVersion protocolVersion) { return null; }@NonNull@Overridepublic String format(@Nullable Address value) { return null; }@Nullable@Overridepublic Address parse(@Nullable String value) { return null; }}).build();}

看方法的入参出参应该很明了方法里该做什么,当然如果实在不明白的话可以看看TypeCodec的其他实现类,看看其他的实现类都是怎么做的。现将方法补全如下:

    @Beanpublic CqlSession cqlSession() {List<InetSocketAddress> contactPoints = properties.getContactPoints().stream().map(address ->new InetSocketAddress(address, properties.getPort())).collect(Collectors.toList());return CqlSession.builder().withAuthCredentials(properties.getUsername(), properties.getPassword()).withKeyspace(properties.getKeyspaceName()).addContactPoints(contactPoints).withLocalDatacenter(properties.getLocalDatacenter()).addTypeCodecs(new TypeCodec<Address>() {@NonNull@Overridepublic GenericType<Address> getJavaType() {return GenericType.of(Address.class);}@NonNull@Overridepublic DataType getCqlType() {return DataTypes.BLOB;}@Nullable@Overridepublic ByteBuffer encode(@Nullable Address value, @NonNull ProtocolVersion protocolVersion) {return ByteBuffer.wrap(new Gson().toJson(value).getBytes());}@Nullable@Overridepublic Address decode(@Nullable ByteBuffer bytes, @NonNull ProtocolVersion protocolVersion) {return new Gson().fromJson(new String(bytes.array()), Address.class);}@NonNull@Overridepublic String format(@Nullable Address value) {return new Gson().toJson(value);}@Nullable@Overridepublic Address parse(@Nullable String value) {return new Gson().fromJson(value, Address.class);}}).build();}

但是实际上,这里存在一个问题,TypeCodec是JavaType和cqlType的映射关系,getCqlType这个方法返回的DataType类型中,有一个工具类DataTypes提供了大部分cqlType,但是唯独缺少了udt类型,所以不论getCqlType方法返回的是什么类型,都会抛出同样的一个异常:

com.datastax.oss.driver.api.core.servererrors.InvalidQueryException: Not enough bytes to read 0th field addr

com.datastax.oss.driver.api.core.servererrors.InvalidQueryException: Not enough bytes to read 0th field addrat com.datastax.oss.driver.api.core.servererrors.InvalidQueryException.copy(InvalidQueryException.java:48) ~[java-driver-core-4.6.1.jar:na]at com.datastax.oss.driver.internal.core.util.concurrent.CompletableFutures.getUninterruptibly(CompletableFutures.java:149) ~[java-driver-core-4.6.1.jar:na]at com.datastax.oss.driver.internal.core.cql.CqlRequestSyncProcessor.process(CqlRequestSyncProcessor.java:53) ~[java-driver-core-4.6.1.jar:na]at com.datastax.oss.driver.internal.core.cql.CqlRequestSyncProcessor.process(CqlRequestSyncProcessor.java:30) ~[java-driver-core-4.6.1.jar:na]at com.datastax.oss.driver.internal.core.session.DefaultSession.execute(DefaultSession.java:230) ~[java-driver-core-4.6.1.jar:na]at com.datastax.oss.driver.api.core.cql.SyncCqlSession.execute(SyncCqlSession.java:53) ~[java-driver-core-4.6.1.jar:na]at org.springframework.data.cassandra.core.cql.CqlTemplate.query(CqlTemplate.java:293) ~[spring-data-cassandra-3.0.0.RELEASE.jar:3.0.0.RELEASE]at org.springframework.data.cassandra.core.cql.CqlTemplate.query(CqlTemplate.java:315) ~[spring-data-cassandra-3.0.0.RELEASE.jar:3.0.0.RELEASE]at org.springframework.data.cassandra.core.CassandraTemplate.select(CassandraTemplate.java:340) ~[spring-data-cassandra-3.0.0.RELEASE.jar:3.0.0.RELEASE]at org.springframework.data.cassandra.repository.query.CassandraQueryExecution$SingleEntityExecution.execute(CassandraQueryExecution.java:149) ~[spring-data-cassandra-3.0.0.RELEASE.jar:3.0.0.RELEASE]at org.springframework.data.cassandra.repository.query.CassandraQueryExecution$ResultProcessingExecution.execute(CassandraQueryExecution.java:239) ~[spring-data-cassandra-3.0.0.RELEASE.jar:3.0.0.RELEASE]at org.springframework.data.cassandra.repository.query.AbstractCassandraQuery.execute(AbstractCassandraQuery.java:105) ~[spring-data-cassandra-3.0.0.RELEASE.jar:3.0.0.RELEASE]at org.springframework.data.repository.core.support.QueryExecutorMethodInterceptor$QueryMethodInvoker.invoke(QueryExecutorMethodInterceptor.java:195) ~[spring-data-commons-2.3.0.RELEASE.jar:2.3.0.RELEASE]at org.springframework.data.repository.core.support.QueryExecutorMethodInterceptor.doInvoke(QueryExecutorMethodInterceptor.java:152) ~[spring-data-commons-2.3.0.RELEASE.jar:2.3.0.RELEASE]at org.springframework.data.repository.core.support.QueryExecutorMethodInterceptor.invoke(QueryExecutorMethodInterceptor.java:130) ~[spring-data-commons-2.3.0.RELEASE.jar:2.3.0.RELEASE]at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) ~[spring-aop-5.2.6.RELEASE.jar:5.2.6.RELEASE]at org.springframework.data.projection.DefaultMethodInvokingMethodInterceptor.invoke(DefaultMethodInvokingMethodInterceptor.java:80) ~[spring-data-commons-2.3.0.RELEASE.jar:2.3.0.RELEASE]at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) ~[spring-aop-5.2.6.RELEASE.jar:5.2.6.RELEASE]at org.springframework.aop.interceptor.ExposeInvocationInterceptor.invoke(ExposeInvocationInterceptor.java:95) ~[spring-aop-5.2.6.RELEASE.jar:5.2.6.RELEASE]at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) ~[spring-aop-5.2.6.RELEASE.jar:5.2.6.RELEASE]at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:212) ~[spring-aop-5.2.6.RELEASE.jar:5.2.6.RELEASE]at com.sun.proxy.$Proxy112.updateAddress(Unknown Source) ~[na:na]at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_231]at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_231]at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_231]at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_231]at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:344) ~[spring-aop-5.2.6.RELEASE.jar:5.2.6.RELEASE]at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:198) ~[spring-aop-5.2.6.RELEASE.jar:5.2.6.RELEASE]at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) ~[spring-aop-5.2.6.RELEASE.jar:5.2.6.RELEASE]at org.springframework.dao.support.PersistenceExceptionTranslationInterceptor.invoke(PersistenceExceptionTranslationInterceptor.java:139) ~[spring-tx-5.2.6.RELEASE.jar:5.2.6.RELEASE]at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) ~[spring-aop-5.2.6.RELEASE.jar:5.2.6.RELEASE]at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:212) ~[spring-aop-5.2.6.RELEASE.jar:5.2.6.RELEASE]at com.sun.proxy.$Proxy112.updateAddress(Unknown Source) ~[na:na]

没有足够的字节提供给第一个field addr来读取,也就是说编解码器在使用的时候需要从ByteBuffer中把字节码读出来,异常就是没有东西可以读,我一开始一直认为编解码过程的ByteBuffer写错了,实际上是cqlType没有对应上,在错误的位置取值肯定是取不到的。而DataTypes工具类里没有提供udt类型的cqlType,此时似乎已经进入了死胡同。

然而回头再看一下DataType接口的实现类,会发现其实是有一个UserDefinedType的,但是同样是一个接口

 再来看看UserDefinedType的实现类,有一个DefaultUserDefinedType,和一个ShallowUserDefinedType,另一个就不看了内部类外部无法访问。

先看看 DefaultUserDefinedType ,

  public DefaultUserDefinedType(@NonNull CqlIdentifier keyspace,@NonNull CqlIdentifier name,boolean frozen,List<CqlIdentifier> fieldNames,@NonNull List<DataType> fieldTypes,@NonNull AttachmentPoint attachmentPoint) {Preconditions.checkNotNull(keyspace);Preconditions.checkNotNull(name);Preconditions.checkNotNull(fieldNames);Preconditions.checkNotNull(fieldTypes);Preconditions.checkArgument(fieldNames.size() > 0, "Field names list can't be null or empty");Preconditions.checkArgument(fieldTypes.size() == fieldNames.size(),"There should be the same number of field names and types");this.keyspace = keyspace;this.name = name;this.frozen = frozen;this.fieldNames = ImmutableList.copyOf(fieldNames);this.fieldTypes = ImmutableList.copyOf(fieldTypes);this.index = new IdentifierIndex(this.fieldNames);this.attachmentPoint = attachmentPoint;}

构造器有点复杂,6个入参,再看看 ShallowUserDefinedType:

  public ShallowUserDefinedType(CqlIdentifier keyspace, CqlIdentifier name, boolean frozen) {this.keyspace = keyspace;this.name = name;this.frozen = frozen;}

构造器只有3个入参,看起来简单多了,先用 ShallowUserDefinedType 试试,而 CqlIdentifier 提供了2个静态方法用于构造它自己,所以最后getCqlType方法写成如下所示:

    @NonNull@Overridepublic DataType getCqlType() {return new ShallowUserDefinedType(CqlIdentifier.fromInternal("cass_stdy"), CqlIdentifier.fromInternal("address"), true);}

再次尝试。还是抛出 com.datastax.oss.driver.api.core.servererrors.InvalidQueryException: Not enough bytes to read 0th field addr 异常。更换为 DefaultUserDefinedType:

	@NonNull@Overridepublic DataType getCqlType() {// return new ShallowUserDefinedType(CqlIdentifier.fromInternal("cass_stdy"), CqlIdentifier.fromInternal("address"), true);return new DefaultUserDefinedType(CqlIdentifier.fromInternal("cass_stdy"), CqlIdentifier.fromInternal("address"), true,Lists.newArrayList(CqlIdentifier.fromInternal("addr"), CqlIdentifier.fromInternal("mail")),Lists.newArrayList(DataTypes.TEXT, DataTypes.TEXT));}

再次尝试。还是抛出 com.datastax.oss.driver.api.core.servererrors.InvalidQueryException: Not enough bytes to read 0th field addr 异常。

看起来这条路走不通,更换第二种思路再来看。

从源码上分析save方法为什么可以编解码,而update方法不能,从中找到突破口。

首先把配置还原,去掉addTypeCodec,重现save方法可以,update不行的场景,然后分析一下堆栈信息:

com.datastax.oss.driver.api.core.type.codec.CodecNotFoundException: Codec not found for requested operation: [null <-> com.*******.sbstdy.entity.Address]at com.datastax.oss.driver.internal.core.type.codec.registry.CachingCodecRegistry.createCodec(CachingCodecRegistry.java:639) ~[java-driver-core-4.6.1.jar:na]at com.datastax.oss.driver.internal.core.type.codec.registry.CachingCodecRegistry.createCodec(CachingCodecRegistry.java:558) ~[java-driver-core-4.6.1.jar:na]at com.datastax.oss.driver.internal.core.type.codec.registry.DefaultCodecRegistry$1.load(DefaultCodecRegistry.java:95) ~[java-driver-core-4.6.1.jar:na]at com.datastax.oss.driver.internal.core.type.codec.registry.DefaultCodecRegistry$1.load(DefaultCodecRegistry.java:92) ~[java-driver-core-4.6.1.jar:na]at com.datastax.oss.driver.shaded.guava.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3527) ~[java-driver-shaded-guava-25.1-jre.jar:na]at com.datastax.oss.driver.shaded.guava.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2276) ~[java-driver-shaded-guava-25.1-jre.jar:na]at com.datastax.oss.driver.shaded.guava.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2154) ~[java-driver-shaded-guava-25.1-jre.jar:na]at com.datastax.oss.driver.shaded.guava.common.cache.LocalCache$Segment.get(LocalCache.java:2044) ~[java-driver-shaded-guava-25.1-jre.jar:na]at com.datastax.oss.driver.shaded.guava.common.cache.LocalCache.get(LocalCache.java:3951) ~[java-driver-shaded-guava-25.1-jre.jar:na]at com.datastax.oss.driver.shaded.guava.common.cache.LocalCache.getOrLoad(LocalCache.java:3973) ~[java-driver-shaded-guava-25.1-jre.jar:na]at com.datastax.oss.driver.shaded.guava.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4957) ~[java-driver-shaded-guava-25.1-jre.jar:na]at com.datastax.oss.driver.shaded.guava.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4963) ~[java-driver-shaded-guava-25.1-jre.jar:na]at com.datastax.oss.driver.internal.core.type.codec.registry.DefaultCodecRegistry.getCachedCodec(DefaultCodecRegistry.java:117) ~[java-driver-core-4.6.1.jar:na]at com.datastax.oss.driver.internal.core.type.codec.registry.CachingCodecRegistry.codecFor(CachingCodecRegistry.java:317) ~[java-driver-core-4.6.1.jar:na]at com.datastax.oss.driver.internal.core.type.codec.registry.CachingCodecRegistry.createCodec(CachingCodecRegistry.java:622) ~[java-driver-core-4.6.1.jar:na]at com.datastax.oss.driver.internal.core.type.codec.registry.CachingCodecRegistry.createCodec(CachingCodecRegistry.java:558) ~[java-driver-core-4.6.1.jar:na]at com.datastax.oss.driver.internal.core.type.codec.registry.DefaultCodecRegistry$1.load(DefaultCodecRegistry.java:95) ~[java-driver-core-4.6.1.jar:na]at com.datastax.oss.driver.internal.core.type.codec.registry.DefaultCodecRegistry$1.load(DefaultCodecRegistry.java:92) ~[java-driver-core-4.6.1.jar:na]at com.datastax.oss.driver.shaded.guava.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3527) ~[java-driver-shaded-guava-25.1-jre.jar:na]at com.datastax.oss.driver.shaded.guava.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2276) ~[java-driver-shaded-guava-25.1-jre.jar:na]at com.datastax.oss.driver.shaded.guava.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2154) ~[java-driver-shaded-guava-25.1-jre.jar:na]at com.datastax.oss.driver.shaded.guava.common.cache.LocalCache$Segment.get(LocalCache.java:2044) ~[java-driver-shaded-guava-25.1-jre.jar:na]at com.datastax.oss.driver.shaded.guava.common.cache.LocalCache.get(LocalCache.java:3951) ~[java-driver-shaded-guava-25.1-jre.jar:na]at com.datastax.oss.driver.shaded.guava.common.cache.LocalCache.getOrLoad(LocalCache.java:3973) ~[java-driver-shaded-guava-25.1-jre.jar:na]at com.datastax.oss.driver.shaded.guava.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4957) ~[java-driver-shaded-guava-25.1-jre.jar:na]at com.datastax.oss.driver.shaded.guava.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4963) ~[java-driver-shaded-guava-25.1-jre.jar:na]at com.datastax.oss.driver.internal.core.type.codec.registry.DefaultCodecRegistry.getCachedCodec(DefaultCodecRegistry.java:117) ~[java-driver-core-4.6.1.jar:na]at com.datastax.oss.driver.internal.core.type.codec.registry.CachingCodecRegistry.codecFor(CachingCodecRegistry.java:287) ~[java-driver-core-4.6.1.jar:na]at com.datastax.oss.driver.internal.core.cql.Conversions.encode(Conversions.java:304) ~[java-driver-core-4.6.1.jar:na]at com.datastax.oss.driver.internal.core.cql.Conversions.encode(Conversions.java:263) ~[java-driver-core-4.6.1.jar:na]at com.datastax.oss.driver.internal.core.cql.Conversions.toMessage(Conversions.java:168) ~[java-driver-core-4.6.1.jar:na]at com.datastax.oss.driver.internal.core.cql.CqlRequestHandler.<init>(CqlRequestHandler.java:173) ~[java-driver-core-4.6.1.jar:na]at com.datastax.oss.driver.internal.core.cql.CqlRequestAsyncProcessor.process(CqlRequestAsyncProcessor.java:44) ~[java-driver-core-4.6.1.jar:na]at com.datastax.oss.driver.internal.core.cql.CqlRequestSyncProcessor.process(CqlRequestSyncProcessor.java:54) ~[java-driver-core-4.6.1.jar:na]at com.datastax.oss.driver.internal.core.cql.CqlRequestSyncProcessor.process(CqlRequestSyncProcessor.java:30) ~[java-driver-core-4.6.1.jar:na]at com.datastax.oss.driver.internal.core.session.DefaultSession.execute(DefaultSession.java:230) ~[java-driver-core-4.6.1.jar:na]at com.datastax.oss.driver.api.core.cql.SyncCqlSession.execute(SyncCqlSession.java:53) ~[java-driver-core-4.6.1.jar:na]at org.springframework.data.cassandra.core.cql.CqlTemplate.query(CqlTemplate.java:293) ~[spring-data-cassandra-3.0.0.RELEASE.jar:3.0.0.RELEASE]at org.springframework.data.cassandra.core.cql.CqlTemplate.query(CqlTemplate.java:315) ~[spring-data-cassandra-3.0.0.RELEASE.jar:3.0.0.RELEASE]at org.springframework.data.cassandra.core.CassandraTemplate.select(CassandraTemplate.java:340) ~[spring-data-cassandra-3.0.0.RELEASE.jar:3.0.0.RELEASE]

先来看倒数第二行堆栈对应源码:

com.datastax.oss.driver.internal.core.type.codec.registry.CachingCodecRegistry.createCodec(CachingCodecRegistry.java:558) ~[java-driver-core-4.6.1.jar:na]
@NonNullprotected TypeCodec<?> createCodec(@Nullable DataType cqlType, @Nullable GenericType<?> javaType, boolean isJavaCovariant) {LOG.trace("[{}] Cache miss, creating codec", logPrefix);// Either type can be null, but not both.if (javaType == null) {assert cqlType != null;return createCodec(cqlType);} else if (cqlType == null) {return createCodec(javaType, isJavaCovariant);} else { // Both non-nullTypeToken<?> token = javaType.__getToken();if (cqlType instanceof ListType && List.class.isAssignableFrom(token.getRawType())) {DataType elementCqlType = ((ListType) cqlType).getElementType();TypeCodec<Object> elementCodec;if (token.getType() instanceof ParameterizedType) {Type[] typeArguments = ((ParameterizedType) token.getType()).getActualTypeArguments();GenericType<?> elementJavaType = GenericType.of(typeArguments[0]);elementCodec = uncheckedCast(codecFor(elementCqlType, elementJavaType, isJavaCovariant));} else {elementCodec = codecFor(elementCqlType);}return TypeCodecs.listOf(elementCodec);} else if (cqlType instanceof SetType && Set.class.isAssignableFrom(token.getRawType())) {DataType elementCqlType = ((SetType) cqlType).getElementType();TypeCodec<Object> elementCodec;if (token.getType() instanceof ParameterizedType) {Type[] typeArguments = ((ParameterizedType) token.getType()).getActualTypeArguments();GenericType<?> elementJavaType = GenericType.of(typeArguments[0]);elementCodec = uncheckedCast(codecFor(elementCqlType, elementJavaType, isJavaCovariant));} else {elementCodec = codecFor(elementCqlType);}return TypeCodecs.setOf(elementCodec);} else if (cqlType instanceof MapType && Map.class.isAssignableFrom(token.getRawType())) {DataType keyCqlType = ((MapType) cqlType).getKeyType();DataType valueCqlType = ((MapType) cqlType).getValueType();TypeCodec<Object> keyCodec;TypeCodec<Object> valueCodec;if (token.getType() instanceof ParameterizedType) {Type[] typeArguments = ((ParameterizedType) token.getType()).getActualTypeArguments();GenericType<?> keyJavaType = GenericType.of(typeArguments[0]);GenericType<?> valueJavaType = GenericType.of(typeArguments[1]);keyCodec = uncheckedCast(codecFor(keyCqlType, keyJavaType, isJavaCovariant));valueCodec = uncheckedCast(codecFor(valueCqlType, valueJavaType, isJavaCovariant));} else {keyCodec = codecFor(keyCqlType);valueCodec = codecFor(valueCqlType);}return TypeCodecs.mapOf(keyCodec, valueCodec);} else if (cqlType instanceof TupleType&& TupleValue.class.isAssignableFrom(token.getRawType())) {return TypeCodecs.tupleOf((TupleType) cqlType);} else if (cqlType instanceof UserDefinedType&& UdtValue.class.isAssignableFrom(token.getRawType())) {return TypeCodecs.udtOf((UserDefinedType) cqlType);} else if (cqlType instanceof CustomType&& ByteBuffer.class.isAssignableFrom(token.getRawType())) {return TypeCodecs.custom(cqlType);}throw new CodecNotFoundException(cqlType, javaType);}}

首先,这个方法有3个入参,核心的前面2个,cqlType和javaType。刚才我说了,TypeCodec的关键是对象转换编解码,而此时抛出异常的位置在 else if (cqlType == null) 这个分支上,说明入参 cqlType 的值是空的,说明 Address 需要一个对应的 cqlType 来映射,而刚刚我们尝试了在配置中增加Codec的方式,似乎是行不通的,这个时候需要另辟蹊径来处理这个问题,再往下看代码,我们应该是想让代码往下面这个分支上走

      else if (cqlType instanceof UserDefinedType&& UdtValue.class.isAssignableFrom(token.getRawType())) {return TypeCodecs.udtOf((UserDefinedType) cqlType);}

所以我们要解决的应该是 cqlType 为空的问题,继续往前跟踪堆栈信息,看看是从哪里开始找cqlType对应关系的。

跟踪发现关键在于:

com.datastax.oss.driver.api.core.type.codec.CodecNotFoundException: Codec not found for requested operation: [null <-> com.*******.sbstdy.entity.Address]at com.datastax.oss.driver.internal.core.type.codec.registry.CachingCodecRegistry.createCodec(CachingCodecRegistry.java:639) ~[java-driver-core-4.6.1.jar:na]at com.datastax.oss.driver.internal.core.type.codec.registry.CachingCodecRegistry.createCodec(CachingCodecRegistry.java:558) ~[java-driver-core-4.6.1.jar:na]at com.datastax.oss.driver.internal.core.type.codec.registry.DefaultCodecRegistry$1.load(DefaultCodecRegistry.java:95) ~[java-driver-core-4.6.1.jar:na]at com.datastax.oss.driver.internal.core.type.codec.registry.DefaultCodecRegistry$1.load(DefaultCodecRegistry.java:92) ~[java-driver-core-4.6.1.jar:na]at com.datastax.oss.driver.shaded.guava.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3527) ~[java-driver-shaded-guava-25.1-jre.jar:na]at com.datastax.oss.driver.shaded.guava.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2276) ~[java-driver-shaded-guava-25.1-jre.jar:na]at com.datastax.oss.driver.shaded.guava.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2154) ~[java-driver-shaded-guava-25.1-jre.jar:na]at com.datastax.oss.driver.shaded.guava.common.cache.LocalCache$Segment.get(LocalCache.java:2044) ~[java-driver-shaded-guava-25.1-jre.jar:na]at com.datastax.oss.driver.shaded.guava.common.cache.LocalCache.get(LocalCache.java:3951) ~[java-driver-shaded-guava-25.1-jre.jar:na]at com.datastax.oss.driver.shaded.guava.common.cache.LocalCache.getOrLoad(LocalCache.java:3973) ~[java-driver-shaded-guava-25.1-jre.jar:na]at com.datastax.oss.driver.shaded.guava.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4957) ~[java-driver-shaded-guava-25.1-jre.jar:na]at com.datastax.oss.driver.shaded.guava.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4963) ~[java-driver-shaded-guava-25.1-jre.jar:na]at com.datastax.oss.driver.internal.core.type.codec.registry.DefaultCodecRegistry.getCachedCodec(DefaultCodecRegistry.java:117) ~[java-driver-core-4.6.1.jar:na]at com.datastax.oss.driver.internal.core.type.codec.registry.CachingCodecRegistry.codecFor(CachingCodecRegistry.java:317) ~[java-driver-core-4.6.1.jar:na]at com.datastax.oss.driver.internal.core.type.codec.registry.CachingCodecRegistry.createCodec(CachingCodecRegistry.java:622) ~[java-driver-core-4.6.1.jar:na]at com.datastax.oss.driver.internal.core.type.codec.registry.CachingCodecRegistry.createCodec(CachingCodecRegistry.java:558) ~[java-driver-core-4.6.1.jar:na]at com.datastax.oss.driver.internal.core.type.codec.registry.DefaultCodecRegistry$1.load(DefaultCodecRegistry.java:95) ~[java-driver-core-4.6.1.jar:na]at com.datastax.oss.driver.internal.core.type.codec.registry.DefaultCodecRegistry$1.load(DefaultCodecRegistry.java:92) ~[java-driver-core-4.6.1.jar:na]at com.datastax.oss.driver.shaded.guava.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3527) ~[java-driver-shaded-guava-25.1-jre.jar:na]at com.datastax.oss.driver.shaded.guava.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2276) ~[java-driver-shaded-guava-25.1-jre.jar:na]at com.datastax.oss.driver.shaded.guava.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2154) ~[java-driver-shaded-guava-25.1-jre.jar:na]at com.datastax.oss.driver.shaded.guava.common.cache.LocalCache$Segment.get(LocalCache.java:2044) ~[java-driver-shaded-guava-25.1-jre.jar:na]at com.datastax.oss.driver.shaded.guava.common.cache.LocalCache.get(LocalCache.java:3951) ~[java-driver-shaded-guava-25.1-jre.jar:na]at com.datastax.oss.driver.shaded.guava.common.cache.LocalCache.getOrLoad(LocalCache.java:3973) ~[java-driver-shaded-guava-25.1-jre.jar:na]at com.datastax.oss.driver.shaded.guava.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4957) ~[java-driver-shaded-guava-25.1-jre.jar:na]at com.datastax.oss.driver.shaded.guava.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4963) ~[java-driver-shaded-guava-25.1-jre.jar:na]at com.datastax.oss.driver.internal.core.type.codec.registry.DefaultCodecRegistry.getCachedCodec(DefaultCodecRegistry.java:117) ~[java-driver-core-4.6.1.jar:na]at com.datastax.oss.driver.internal.core.type.codec.registry.CachingCodecRegistry.codecFor(CachingCodecRegistry.java:287) ~[java-driver-core-4.6.1.jar:na]

最下面这一行对应的代码位置:

  @NonNull@Overridepublic <JavaTypeT> TypeCodec<JavaTypeT> codecFor(@NonNull JavaTypeT value) {// 此处省略无关代码DataType cqlType = inferCqlTypeFromValue(value);GenericType<?> javaType = inspectType(value, cqlType);LOG.trace("[{}] Continuing based on inferred CQL type {} and Java type {}",logPrefix,cqlType,javaType);return uncheckedCast(getCachedCodec(cqlType, javaType, true));}

这个时候 cqlType 是从 DataType cqlType = inferCqlTypeFromValue(value); 方法来的,而且debug可以发现这个value是有值的

所以跟进方法里看:

  @Nullableprotected DataType inferCqlTypeFromValue(@NonNull Object value) {if (value instanceof List) {List<?> list = (List<?>) value;if (list.isEmpty()) {return CQL_TYPE_FOR_EMPTY_LISTS;}Object firstElement = list.get(0);if (firstElement == null) {throw new IllegalArgumentException("Can't infer list codec because the first element is null "+ "(note that CQL does not allow null values in collections)");}DataType elementType = inferCqlTypeFromValue(firstElement);if (elementType == null) {return null;}return DataTypes.listOf(elementType);} else if (value instanceof Set) {Set<?> set = (Set<?>) value;if (set.isEmpty()) {return CQL_TYPE_FOR_EMPTY_SETS;}Object firstElement = set.iterator().next();if (firstElement == null) {throw new IllegalArgumentException("Can't infer set codec because the first element is null "+ "(note that CQL does not allow null values in collections)");}DataType elementType = inferCqlTypeFromValue(firstElement);if (elementType == null) {return null;}return DataTypes.setOf(elementType);} else if (value instanceof Map) {Map<?, ?> map = (Map<?, ?>) value;if (map.isEmpty()) {return CQL_TYPE_FOR_EMPTY_MAPS;}Entry<?, ?> firstEntry = map.entrySet().iterator().next();Object firstKey = firstEntry.getKey();Object firstValue = firstEntry.getValue();if (firstKey == null || firstValue == null) {throw new IllegalArgumentException("Can't infer map codec because the first key and/or value is null "+ "(note that CQL does not allow null values in collections)");}DataType keyType = inferCqlTypeFromValue(firstKey);DataType valueType = inferCqlTypeFromValue(firstValue);if (keyType == null || valueType == null) {return null;}return DataTypes.mapOf(keyType, valueType);}Class<?> javaClass = value.getClass();if (ByteBuffer.class.isAssignableFrom(javaClass)) {return DataTypes.BLOB;} else if (String.class.equals(javaClass)) {return DataTypes.TEXT;} else if (Long.class.equals(javaClass)) {return DataTypes.BIGINT;} else if (Boolean.class.equals(javaClass)) {return DataTypes.BOOLEAN;} else if (BigDecimal.class.equals(javaClass)) {return DataTypes.DECIMAL;} else if (Double.class.equals(javaClass)) {return DataTypes.DOUBLE;} else if (Float.class.equals(javaClass)) {return DataTypes.FLOAT;} else if (Integer.class.equals(javaClass)) {return DataTypes.INT;} else if (Instant.class.equals(javaClass)) {return DataTypes.TIMESTAMP;} else if (UUID.class.equals(javaClass)) {return DataTypes.UUID;} else if (BigInteger.class.equals(javaClass)) {return DataTypes.VARINT;} else if (InetAddress.class.isAssignableFrom(javaClass)) {return DataTypes.INET;} else if (LocalDate.class.equals(javaClass)) {return DataTypes.DATE;} else if (LocalTime.class.equals(javaClass)) {return DataTypes.TIME;} else if (Short.class.equals(javaClass)) {return DataTypes.SMALLINT;} else if (Byte.class.equals(javaClass)) {return DataTypes.TINYINT;} else if (CqlDuration.class.equals(javaClass)) {return DataTypes.DURATION;} else if (UdtValue.class.isAssignableFrom(javaClass)) {return ((UdtValue) value).getType();} else if (TupleValue.class.isAssignableFrom(javaClass)) {return ((TupleValue) value).getType();}// This might mean that the java type is a custom type with a custom codec,// so don't throw CodecNotFoundException just yet.return null;}

这个方法很长,但是意义很明了,一个长 if-else 代码块,根据JavaType的类型进行强制类型转换,找到关键,倒数第二个分支:

    else if (UdtValue.class.isAssignableFrom(javaClass)) {return ((UdtValue) value).getType();}

这里才是我们想要让代码走的分支,但是实际上,我们定义的Java对象是不满足条件的,我们的 Address 对象没有显式声明继承任何父类或实现任何接口,所以这时候给出的方案是,让Address实现UdtValue接口,或继承UdtValue的实现类。UdtValue只有一个实现类 DefaultUdtValue

所以把Address做一下改造:

@UserDefinedType("address")
public class Address extends DefaultUdtValue{@Column("addr")private String addr;@Column("mail")private String mail;public  Address() {super(new DefaultUserDefinedType(CqlIdentifier.fromInternal("cass_stdy"), CqlIdentifier.fromInternal("address"), true,Lists.newArrayList(CqlIdentifier.fromInternal("addr"), CqlIdentifier.fromInternal("mail")),Lists.newArrayList(DataTypes.TEXT, DataTypes.TEXT)));}// getter ... setter ...
}

因为 DefaultUdtValue 的默认构造器入参需要 UserDefinedType 所以把之前用在配置添加 TypeCodec 的代码移过来。

完成之后再次启动工程,执行save和update,结果成功了,并没有报错,查询数据库:

cqlsh:cass_stdy> SELECT * FROM user;id                                   | addr                                                                                                     | age  | birthday | d_addr | name     | score
--------------------------------------+----------------------------------------------------------------------------------------------------------+------+----------+--------+----------+-------dbde2592-9275-4494-b525-ee84a8263b44 |                                                     [{addr: null, mail: null}, {addr: null, mail: null}] | null |     null |   null | name0000 |  nullf7702276-694a-47e1-92cf-312db4101353 | [{addr: null, mail: null}, {addr: null, mail: null}, {addr: null, mail: null}, {addr: null, mail: null}] | null |     null |   null | name0000 |  null(2 rows)

情况好像有点不对劲,怎么addr里的值都是null呢。

 

  相关解决方案