this.pullAPIWrapper = new PullAPIWrapper( mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode()); this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);
if (this.defaultMQPushConsumer.getOffsetStore() != null) { this.offsetStore = this.defaultMQPushConsumer.getOffsetStore(); } else { switch (this.defaultMQPushConsumer.getMessageModel()) { case BROADCASTING: this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup()); break; case CLUSTERING: this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup()); break; default: break; } this.defaultMQPushConsumer.setOffsetStore(this.offsetStore); } this.offsetStore.load();
if (this.getMessageListenerInner() instanceof MessageListenerOrderly) { this.consumeOrderly = true; this.consumeMessageService = new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner()); } else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) { this.consumeOrderly = false; this.consumeMessageService = new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner()); }
this.consumeMessageService.start();
boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this); if (!registerOK) { this.serviceState = ServiceState.CREATE_JUST; this.consumeMessageService.shutdown(defaultMQPushConsumer.getAwaitTerminationMillisWhenShutdown()); throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup() + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL), null); }
mQClientFactory.start(); log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup()); this.serviceState = ServiceState.RUNNING; break; case RUNNING: case START_FAILED: case SHUTDOWN_ALREADY: throw new MQClientException("The PushConsumer service state not OK, maybe started once, " + this.serviceState + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), null); default: break; }
[root@paas-m-k8s-master-1 ~]# kc -n td-redis exec -it redis-jerry-0-0 -c redis-jerry-0 -- redis-cli -a 88c185e86f684251 -c Warning: Using a password with '-a' or '-u' option on the command line interface may not be safe. 127.0.0.1:6379> set a 100 -> Redirected to slot [15495] located at 100.111.149.228:6379 OK 100.111.149.228:6379> get a "100" 100.111.149.228:6379>
1:S 18 Jul 2023 06:48:30.029 # Currently unable to failover: Disconnected from master for longer than allowed. Please check the ‘cluster-replica-validity-factor’ configuration option.
helm search repo redis -l NAME CHART VERSION APP VERSION DESCRIPTION bitnami/redis 16.13.2 6.2.7 Redis(R) is an open source, advanced key-value ... bitnami/redis-cluster 7.6.4 6.2.7 Redis(R) is an open source, scalable, distribut...
NAME: redis-cluster LAST DEPLOYED: Tue Jun 20 10:50:29 2023 NAMESPACE: demo STATUS: deployed REVISION: 1 TEST SUITE: None NOTES: CHART NAME: redis-cluster CHART VERSION: 7.6.4 APP VERSION: 6.2.7** Please be patient while the chart is being deployed **
To get your password run: export REDIS_PASSWORD=$(kubectl get secret –namespace “demo” redis-cluster -o jsonpath=”{.data.redis-password}” | base64 -d)
You have deployed a Redis® Cluster accessible only from within you Kubernetes Cluster.INFO: The Job to create the cluster will be created.To connect to your Redis® cluster:
Run a Redis® pod that you can use as a client: kubectl run –namespace demo redis-cluster-client –rm –tty -i –restart=’Never’ –env REDIS_PASSWORD=$REDIS_PASSWORD –image docker.io/bitnami/redis-cluster:6.2.7-debian-11-r9 – bash
Connect using the Redis® CLI:
redis-cli -c -h redis-cluster -a $REDIS_PASSWORD
4,查看
可以使用helm list命令查看
1 2 3 4
[root@paas-m-k8s-master-1 7.6.4]# helm list -n demo NAME NAMESPACE REVISION UPDATED STATUS CHART APP VERSION redis-cluster demo 1 2023-06-20 11:21:01.374707946 +0800 CST deployed redis-cluster-7.6.4 6.2.7