Class RedisClusterClient
- java.lang.Object
-
- io.lettuce.core.AbstractRedisClient
-
- io.lettuce.core.cluster.RedisClusterClient
-
- All Implemented Interfaces:
AutoCloseable
public class RedisClusterClient extends AbstractRedisClient
A scalable and thread-safe Redis cluster client supporting synchronous, asynchronous and reactive execution models. Multiple threads may share one connection. The cluster client handles command routing based on the first key of the command and maintains a view of the cluster that is available when calling thegetPartitions()
method.Connections to the cluster members are opened on the first access to the cluster node and managed by the
StatefulRedisClusterConnection
. You should not use transactional commands on cluster connections sinceMULTI
,EXEC
andDISCARD
have no key and cannot be assigned to a particular node. A cluster connection uses a default connection to run non-keyed commands.The Redis cluster client provides a
sync
,async
andreactive
API.Connections to particular nodes can be obtained by
StatefulRedisClusterConnection.getConnection(String)
providing the node id orStatefulRedisClusterConnection.getConnection(String, int)
by host and port.Multiple keys operations have to operate on a key that hashes to the same slot. Following commands do not need to follow that rule since they are pipelined according to its hash value to multiple nodes in parallel on the sync, async and, reactive API:
DEL
UNLINK
MGET
RedisStringAsyncCommands.mget(KeyValueStreamingChannel, Object[])
) MGET with streaming}MSET
MSETNX
Following commands on the Cluster sync, async and, reactive API are implemented with a Cluster-flavor:
RedisAdvancedClusterAsyncCommands.clientSetname(Object)
ExecutesCLIENT SET
on all connections and initializes new connections with theclientName
.RedisAdvancedClusterAsyncCommands.flushall()
RunFLUSHALL
on all upstream nodes.RedisAdvancedClusterAsyncCommands.flushdb()
ExecutesFLUSHDB
on all upstream nodes.RedisAdvancedClusterAsyncCommands.keys(Object)
ExecutesKEYS
on all.RedisAdvancedClusterAsyncCommands.randomkey()
Returns a random key from a random upstream node.RedisAdvancedClusterAsyncCommands.scriptFlush()
ExecutesSCRIPT FLUSH
on all nodes.RedisAdvancedClusterAsyncCommands.scriptKill()
ExecutesSCRIPT KILL
on all nodes.RedisAdvancedClusterAsyncCommands.shutdown(boolean)
ExecutesSHUTDOWN
on all nodes.RedisAdvancedClusterAsyncCommands.scan()
Executes aSCAN
on all nodes according toReadFrom
. The resulting cursor must be reused across theSCAN
to scan iteratively across the whole cluster.
Cluster commands can be issued to multiple hosts in parallel by using the
NodeSelectionSupport
API. A set of nodes is selected using aPredicate
and commands can be issued to the node selectionAsyncExecutions<String> ping = commands.upstream().commands().ping(); Collection<RedisClusterNode> nodes = ping.nodes(); nodes.stream().forEach(redisClusterNode -> ping.get(redisClusterNode));
Connection timeouts are initialized from the first provided
RedisURI
.RedisClusterClient
is an expensive resource. Reuse this instance or share externalClientResources
as much as possible.- Since:
- 3.0
- Author:
- Mark Paluch
- See Also:
RedisURI
,StatefulRedisClusterConnection
,RedisCodec
,ClusterClientOptions
,ClientResources
-
-
Field Summary
-
Fields inherited from class io.lettuce.core.AbstractRedisClient
channels, closeableResources, connectionEvents
-
-
Constructor Summary
Constructors Modifier Constructor Description protected
RedisClusterClient()
Non-private constructor to makeRedisClusterClient
proxyable.protected
RedisClusterClient(ClientResources clientResources, Iterable<RedisURI> redisURIs)
Initialize the client with a list of cluster URI's.
-
Method Summary
All Methods Static Methods Instance Methods Concrete Methods Deprecated Methods Modifier and Type Method Description StatefulRedisClusterConnection<String,String>
connect()
Connect to a Redis Cluster and treat keys and values as UTF-8 strings.<K,V>
StatefulRedisClusterConnection<K,V>connect(RedisCodec<K,V> codec)
Connect to a Redis Cluster.<K,V>
CompletableFuture<StatefulRedisClusterConnection<K,V>>connectAsync(RedisCodec<K,V> codec)
Connect asynchronously to a Redis Cluster.StatefulRedisClusterPubSubConnection<String,String>
connectPubSub()
Connect to a Redis Cluster using pub/sub connections and treat keys and values as UTF-8 strings.<K,V>
StatefulRedisClusterPubSubConnection<K,V>connectPubSub(RedisCodec<K,V> codec)
Connect to a Redis Cluster using pub/sub connections.<K,V>
CompletableFuture<StatefulRedisClusterPubSubConnection<K,V>>connectPubSubAsync(RedisCodec<K,V> codec)
Connect asynchronously to a Redis Cluster using pub/sub connections.static RedisClusterClient
create(RedisURI redisURI)
Create a new client that connects to the supplieduri
with defaultClientResources
.static RedisClusterClient
create(ClientResources clientResources, RedisURI redisURI)
Create a new client that connects to the supplieduri
with sharedClientResources
.static RedisClusterClient
create(ClientResources clientResources, Iterable<RedisURI> redisURIs)
Create a new client that connects to the supplieduri
with sharedClientResources
.static RedisClusterClient
create(ClientResources clientResources, String uri)
Create a new client that connects to the supplied uri with sharedClientResources
.You need to shut down theClientResources
upon shutting down your application.static RedisClusterClient
create(Iterable<RedisURI> redisURIs)
Create a new client that connects to the supplieduri
with defaultClientResources
.static RedisClusterClient
create(String uri)
Create a new client that connects to the supplied uri with defaultClientResources
.protected ClusterTopologyRefresh
createTopologyRefresh()
Template method to createClusterTopologyRefresh
.protected Partitions
determinePartitions(Partitions current, Map<RedisURI,Partitions> topologyViews)
Determines atopology view
based on the current and the obtain topology views.protected <T extends Closeable>
voidforEachCloseable(Predicate<? super Closeable> selector, Consumer<T> function)
protected void
forEachClusterConnection(Consumer<StatefulRedisClusterConnectionImpl<?,?>> function)
Apply aConsumer
ofStatefulRedisClusterConnectionImpl
to all active connections.protected void
forEachClusterPubSubConnection(Consumer<io.lettuce.core.cluster.StatefulRedisClusterPubSubConnectionImpl<?,?>> function)
Apply aConsumer
ofStatefulRedisClusterPubSubConnectionImpl
to all active connections.protected RedisURI
getFirstUri()
Returns the firstRedisURI
configured with thisRedisClusterClient
instance.protected Iterable<RedisURI>
getInitialUris()
Partitions
getPartitions()
Retrieve the cluster view.protected Mono<SocketAddress>
getSocketAddressSupplier(Supplier<Partitions> partitionsSupplier, Function<Partitions,Collection<RedisClusterNode>> sortFunction)
Returns aSupplier
forconnection points
.protected Iterable<RedisURI>
getTopologyRefreshSource()
Returns the seedRedisURI
for the topology refreshing.protected CompletableFuture<Partitions>
initializePartitions()
boolean
isTopologyRefreshInProgress()
Return whether a scheduled or adaptive topology refresh is in progress.protected Partitions
loadPartitions()
Retrieve partitions.protected CompletableFuture<Partitions>
loadPartitionsAsync()
Retrieve partitions.protected <V,K>
StatefulRedisClusterConnectionImpl<K,V>newStatefulRedisClusterConnection(RedisChannelWriter channelWriter, ClusterPushHandler pushHandler, RedisCodec<K,V> codec, Duration timeout)
Create a new instance ofStatefulRedisClusterConnectionImpl
or a subclass.protected <K,V>
StatefulRedisConnectionImpl<K,V>newStatefulRedisConnection(RedisChannelWriter channelWriter, PushHandler pushHandler, RedisCodec<K,V> codec, Duration timeout)
Create a new instance ofStatefulRedisConnectionImpl
or a subclass.protected RedisCodec<String,String>
newStringStringCodec()
void
refreshPartitions()
Refresh partitions and re-initialize the routing table.CompletionStage<Void>
refreshPartitionsAsync()
Asynchronously reload partitions and re-initialize the distribution table.void
reloadPartitions()
Deprecated.since 6.0.void
setOptions(ClusterClientOptions clientOptions)
Set theClusterClientOptions
for the client.void
setPartitions(Partitions partitions)
Sets the new cluster topology.CompletableFuture<Void>
shutdownAsync(long quietPeriod, long timeout, TimeUnit timeUnit)
Shutdown this client and close all open connections asynchronously.void
suspendTopologyRefresh()
Suspend periodic topology refresh if it was activated previously.protected static <T> CompletableFuture<T>
transformAsyncConnectionException(CompletionStage<T> future, Iterable<RedisURI> target)
protected void
updatePartitionsInConnections()
protected boolean
useDynamicRefreshSources()
Returnstrue
ifdynamic refresh sources
are enabled.-
Methods inherited from class io.lettuce.core.AbstractRedisClient
addListener, addListener, channelType, close, connectionBuilder, connectionBuilder, createHandshake, getChannelCount, getCommandListeners, getConnection, getConnection, getDefaultTimeout, getOptions, getResourceCount, getResources, initializeChannelAsync, removeListener, removeListener, setDefaultTimeout, setDefaultTimeout, setOptions, shutdown, shutdown, shutdown, shutdownAsync
-
-
-
-
Constructor Detail
-
RedisClusterClient
protected RedisClusterClient()
Non-private constructor to makeRedisClusterClient
proxyable.
-
RedisClusterClient
protected RedisClusterClient(ClientResources clientResources, Iterable<RedisURI> redisURIs)
Initialize the client with a list of cluster URI's. All uris are tried in sequence for connecting initially to the cluster. If any uri is successful for connection, the others are not tried anymore. The initial uri is needed to discover the cluster structure for distributing the requests.- Parameters:
clientResources
- the client resources. Ifnull
, the client will create a new dedicated instance of client resources and keep track of them.redisURIs
- iterable of initialcluster URIs
. Must not benull
and not empty.
-
-
Method Detail
-
create
public static RedisClusterClient create(RedisURI redisURI)
Create a new client that connects to the supplieduri
with defaultClientResources
. You can connect to different Redis servers but you must supply aRedisURI
on connecting.- Parameters:
redisURI
- the Redis URI, must not benull
- Returns:
- a new instance of
RedisClusterClient
-
create
public static RedisClusterClient create(Iterable<RedisURI> redisURIs)
Create a new client that connects to the supplieduri
with defaultClientResources
. You can connect to different Redis servers but you must supply aRedisURI
on connecting.- Parameters:
redisURIs
- one or more Redis URI, must not benull
and not empty.- Returns:
- a new instance of
RedisClusterClient
-
create
public static RedisClusterClient create(String uri)
Create a new client that connects to the supplied uri with defaultClientResources
. You can connect to different Redis servers but you must supply aRedisURI
on connecting.- Parameters:
uri
- the Redis URI, must not be empty ornull
.- Returns:
- a new instance of
RedisClusterClient
-
create
public static RedisClusterClient create(ClientResources clientResources, RedisURI redisURI)
Create a new client that connects to the supplieduri
with sharedClientResources
. You need to shut down theClientResources
upon shutting down your application.You can connect to different Redis servers but you must supply aRedisURI
on connecting.- Parameters:
clientResources
- the client resources, must not benull
redisURI
- the Redis URI, must not benull
- Returns:
- a new instance of
RedisClusterClient
-
create
public static RedisClusterClient create(ClientResources clientResources, String uri)
Create a new client that connects to the supplied uri with sharedClientResources
.You need to shut down theClientResources
upon shutting down your application. You can connect to different Redis servers but you must supply aRedisURI
on connecting.- Parameters:
clientResources
- the client resources, must not benull
uri
- the Redis URI, must not be empty ornull
.- Returns:
- a new instance of
RedisClusterClient
-
create
public static RedisClusterClient create(ClientResources clientResources, Iterable<RedisURI> redisURIs)
Create a new client that connects to the supplieduri
with sharedClientResources
. You need to shut down theClientResources
upon shutting down your application.You can connect to different Redis servers but you must supply aRedisURI
on connecting.- Parameters:
clientResources
- the client resources, must not benull
redisURIs
- one or more Redis URI, must not benull
and not empty- Returns:
- a new instance of
RedisClusterClient
-
setOptions
public void setOptions(ClusterClientOptions clientOptions)
Set theClusterClientOptions
for the client.- Parameters:
clientOptions
- client options for the client and connections that are created after setting the options
-
getPartitions
public Partitions getPartitions()
Retrieve the cluster view. Partitions are shared amongst all connections opened by this client instance.- Returns:
- the partitions.
-
getTopologyRefreshSource
protected Iterable<RedisURI> getTopologyRefreshSource()
Returns the seedRedisURI
for the topology refreshing. This method is called before each topology refresh to provide anIterable
ofRedisURI
that is used to perform the next topology refresh.Subclasses of
RedisClusterClient
may override that method.
-
connect
public StatefulRedisClusterConnection<String,String> connect()
Connect to a Redis Cluster and treat keys and values as UTF-8 strings.What to expect from this connection:
- A default connection is created to the node with the lowest latency
- Keyless commands are send to the default connection
- Single-key keyspace commands are routed to the appropriate node
- Multi-key keyspace commands require the same slot-hash and are routed to the appropriate node
- Pub/sub commands are sent to the node that handles the slot derived from the pub/sub channel
- Returns:
- A new stateful Redis Cluster connection
-
connect
public <K,V> StatefulRedisClusterConnection<K,V> connect(RedisCodec<K,V> codec)
Connect to a Redis Cluster. Use the suppliedcodec
to encode/decode keys and values.What to expect from this connection:
- A default connection is created to the node with the lowest latency
- Keyless commands are send to the default connection
- Single-key keyspace commands are routed to the appropriate node
- Multi-key keyspace commands require the same slot-hash and are routed to the appropriate node
- Pub/sub commands are sent to the node that handles the slot derived from the pub/sub channel
- Type Parameters:
K
- Key typeV
- Value type- Parameters:
codec
- Use this codec to encode/decode keys and values, must not benull
- Returns:
- A new stateful Redis Cluster connection
-
connectAsync
public <K,V> CompletableFuture<StatefulRedisClusterConnection<K,V>> connectAsync(RedisCodec<K,V> codec)
Connect asynchronously to a Redis Cluster. Use the suppliedcodec
to encode/decode keys and values. Connecting asynchronously requires an initialized topology. CallgetPartitions()
first, otherwise the connect will fail with aIllegalStateException
.What to expect from this connection:
- A default connection is created to the node with the lowest latency
- Keyless commands are send to the default connection
- Single-key keyspace commands are routed to the appropriate node
- Multi-key keyspace commands require the same slot-hash and are routed to the appropriate node
- Pub/sub commands are sent to the node that handles the slot derived from the pub/sub channel
- Type Parameters:
K
- Key typeV
- Value type- Parameters:
codec
- Use this codec to encode/decode keys and values, must not benull
- Returns:
- a
CompletableFuture
that is notified with the connection progress. - Since:
- 5.1
-
connectPubSub
public StatefulRedisClusterPubSubConnection<String,String> connectPubSub()
Connect to a Redis Cluster using pub/sub connections and treat keys and values as UTF-8 strings.What to expect from this connection:
- A default connection is created to the node with the least number of clients
- Pub/sub commands are sent to the node with the least number of clients
- Keyless commands are send to the default connection
- Single-key keyspace commands are routed to the appropriate node
- Multi-key keyspace commands require the same slot-hash and are routed to the appropriate node
- Returns:
- A new stateful Redis Cluster connection
-
connectPubSub
public <K,V> StatefulRedisClusterPubSubConnection<K,V> connectPubSub(RedisCodec<K,V> codec)
Connect to a Redis Cluster using pub/sub connections. Use the suppliedcodec
to encode/decode keys and values.What to expect from this connection:
- A default connection is created to the node with the least number of clients
- Pub/sub commands are sent to the node with the least number of clients
- Keyless commands are send to the default connection
- Single-key keyspace commands are routed to the appropriate node
- Multi-key keyspace commands require the same slot-hash and are routed to the appropriate node
- Type Parameters:
K
- Key typeV
- Value type- Parameters:
codec
- Use this codec to encode/decode keys and values, must not benull
- Returns:
- A new stateful Redis Cluster connection
-
connectPubSubAsync
public <K,V> CompletableFuture<StatefulRedisClusterPubSubConnection<K,V>> connectPubSubAsync(RedisCodec<K,V> codec)
Connect asynchronously to a Redis Cluster using pub/sub connections. Use the suppliedcodec
to encode/decode keys and values. Connecting asynchronously requires an initialized topology. CallgetPartitions()
first, otherwise the connect will fail with aIllegalStateException
.What to expect from this connection:
- A default connection is created to the node with the least number of clients
- Pub/sub commands are sent to the node with the least number of clients
- Keyless commands are send to the default connection
- Single-key keyspace commands are routed to the appropriate node
- Multi-key keyspace commands require the same slot-hash and are routed to the appropriate node
- Type Parameters:
K
- Key typeV
- Value type- Parameters:
codec
- Use this codec to encode/decode keys and values, must not benull
- Returns:
- a
CompletableFuture
that is notified with the connection progress. - Since:
- 5.1
-
newStatefulRedisConnection
protected <K,V> StatefulRedisConnectionImpl<K,V> newStatefulRedisConnection(RedisChannelWriter channelWriter, PushHandler pushHandler, RedisCodec<K,V> codec, Duration timeout)
Create a new instance ofStatefulRedisConnectionImpl
or a subclass.Subclasses of
RedisClusterClient
may override that method.- Type Parameters:
K
- Key-TypeV
- Value Type- Parameters:
channelWriter
- the channel writerpushHandler
- the handler for push notificationscodec
- codectimeout
- default timeout- Returns:
- new instance of StatefulRedisConnectionImpl
-
newStatefulRedisClusterConnection
protected <V,K> StatefulRedisClusterConnectionImpl<K,V> newStatefulRedisClusterConnection(RedisChannelWriter channelWriter, ClusterPushHandler pushHandler, RedisCodec<K,V> codec, Duration timeout)
Create a new instance ofStatefulRedisClusterConnectionImpl
or a subclass.Subclasses of
RedisClusterClient
may override that method.- Type Parameters:
K
- Key-TypeV
- Value Type- Parameters:
channelWriter
- the channel writerpushHandler
- the handler for push notificationscodec
- codectimeout
- default timeout- Returns:
- new instance of StatefulRedisClusterConnectionImpl
-
reloadPartitions
@Deprecated public void reloadPartitions()
Deprecated.since 6.0. Renamed torefreshPartitions()
.Refresh partitions and re-initialize the routing table.
-
refreshPartitions
public void refreshPartitions()
Refresh partitions and re-initialize the routing table.- Since:
- 6.0
-
refreshPartitionsAsync
public CompletionStage<Void> refreshPartitionsAsync()
Asynchronously reload partitions and re-initialize the distribution table.- Returns:
- a
CompletionStage
that signals completion. - Since:
- 6.0
-
suspendTopologyRefresh
public void suspendTopologyRefresh()
Suspend periodic topology refresh if it was activated previously. Suspending cancels the periodic schedule without interrupting any running topology refresh. Suspension is in place until obtaining a newconnection
.- Since:
- 6.3
-
isTopologyRefreshInProgress
public boolean isTopologyRefreshInProgress()
Return whether a scheduled or adaptive topology refresh is in progress.- Returns:
true
if a topology refresh is in progress.- Since:
- 6.3
-
updatePartitionsInConnections
protected void updatePartitionsInConnections()
-
initializePartitions
protected CompletableFuture<Partitions> initializePartitions()
-
loadPartitions
protected Partitions loadPartitions()
Retrieve partitions. Nodes withinPartitions
are ordered by latency. Lower latency nodes come first.- Returns:
- Partitions
-
loadPartitionsAsync
protected CompletableFuture<Partitions> loadPartitionsAsync()
Retrieve partitions. Nodes withinPartitions
are ordered by latency. Lower latency nodes come first.- Returns:
- future that emits
Partitions
upon a successful topology lookup. - Since:
- 6.0
-
determinePartitions
protected Partitions determinePartitions(Partitions current, Map<RedisURI,Partitions> topologyViews)
Determines atopology view
based on the current and the obtain topology views.- Parameters:
current
- the current topology view. May benull
ifRedisClusterClient
has no topology view yet.topologyViews
- the obtain topology views- Returns:
- the
topology view
to use.
-
setPartitions
public void setPartitions(Partitions partitions)
Sets the new cluster topology. The partitions are not applied to existing connections.- Parameters:
partitions
- partitions object
-
shutdownAsync
public CompletableFuture<Void> shutdownAsync(long quietPeriod, long timeout, TimeUnit timeUnit)
Shutdown this client and close all open connections asynchronously. The client should be discarded after calling shutdown.- Overrides:
shutdownAsync
in classAbstractRedisClient
- Parameters:
quietPeriod
- the quiet period as described in the documentationtimeout
- the maximum amount of time to wait until the executor is shutdown regardless if a task was submitted during the quiet periodtimeUnit
- the unit ofquietPeriod
andtimeout
- Since:
- 4.4
- See Also:
EventExecutorGroup.shutdownGracefully(long, long, TimeUnit)
-
getFirstUri
protected RedisURI getFirstUri()
Returns the firstRedisURI
configured with thisRedisClusterClient
instance.- Returns:
- the first
RedisURI
.
-
getSocketAddressSupplier
protected Mono<SocketAddress> getSocketAddressSupplier(Supplier<Partitions> partitionsSupplier, Function<Partitions,Collection<RedisClusterNode>> sortFunction)
Returns aSupplier
forconnection points
.- Parameters:
sortFunction
- Sort function to enforce a specific order. The sort function must not change the order or the input parameter but create a new collection with the desired order, must not benull
.- Returns:
Supplier
forconnection points
.
-
forEachClusterConnection
protected void forEachClusterConnection(Consumer<StatefulRedisClusterConnectionImpl<?,?>> function)
Apply aConsumer
ofStatefulRedisClusterConnectionImpl
to all active connections.- Parameters:
function
- theConsumer
.
-
forEachClusterPubSubConnection
protected void forEachClusterPubSubConnection(Consumer<io.lettuce.core.cluster.StatefulRedisClusterPubSubConnectionImpl<?,?>> function)
Apply aConsumer
ofStatefulRedisClusterPubSubConnectionImpl
to all active connections.- Parameters:
function
- theConsumer
.
-
forEachCloseable
protected <T extends Closeable> void forEachCloseable(Predicate<? super Closeable> selector, Consumer<T> function)
- Type Parameters:
T
-- Parameters:
function
- theConsumer
.
-
createTopologyRefresh
protected ClusterTopologyRefresh createTopologyRefresh()
Template method to createClusterTopologyRefresh
. Can be overriden by subclasses.- Returns:
- Since:
- 6.0.3
-
useDynamicRefreshSources
protected boolean useDynamicRefreshSources()
Returnstrue
ifdynamic refresh sources
are enabled.Subclasses of
RedisClusterClient
may override that method.- Returns:
true
if dynamic refresh sources are used.- See Also:
ClusterTopologyRefreshOptions.useDynamicRefreshSources()
-
newStringStringCodec
protected RedisCodec<String,String> newStringStringCodec()
- Returns:
- a
String
codec
. - See Also:
StringCodec.UTF8
-
transformAsyncConnectionException
protected static <T> CompletableFuture<T> transformAsyncConnectionException(CompletionStage<T> future, Iterable<RedisURI> target)
-
-