前言

在上文中,我成功从将SpringBoot转为go。接下来,我将尝试把redis锁改为etcd锁。

Redis锁和Etcd锁是两种常见的分布式锁实现方式,它们在设计和特性上存在一些区别。

  1. 存储系统:Redis是一个基于内存的键值数据库,而Etcd是一个高可用的分布式键值存储系统。

  2. 一致性模型:Redis使用主从复制或哨兵模式来提供高可用性,并且在某些情况下可能会存在数据不一致的风险。Etcd则采用了Raft一致性算法,确保了强一致性。

  3. 锁的实现方式:在Redis中,可以使用单个Redis实例或Redis集群作为锁服务。通常使用SETNX(SET if Not eXists)指令尝试在特定键上设置值,以获取锁,并使用DEL指令来释放锁。在Etcd中,可以通过创建一个有序的临时键(ephemeral key)来实现分布式锁。多个客户端尝试创建相同的键,只有最小的键持有者获得锁。

  4. 锁的可重入性:Redis锁不支持可重入性,即同一个线程/进程不能重复获取同一个锁。而Etcd可以通过维护客户端的标识符和计数器来实现可重入性。

  5. 锁的自动过期:Redis支持设置锁的过期时间,可以使用EXPIRE命令来设置锁的自动释放时间。Etcd则需要客户端主动续约,否则在租约到期后将自动释放锁。

  6. 锁的竞争解决策略:Redis的锁通常是通过轮询来竞争获取,即多个客户端不断尝试获取锁直到成功。Etcd则可以使用分布式锁算法,如基于排队、选举或令牌等方式来解决竞争问题。

总体而言,Redis锁适用于较简单的场景,它的实现相对简单且易于部署。而Etcd锁则适用于更复杂的分布式系统,具备更强的一致性和可重入性,并支持更灵活的竞争解决策略。由于我们在部署k3s时使用的endpoint是etcd,所以我们直接连接这个etcd即可。


测试联通性

通过etcdctl命令来测试联通性,下载地址:Releases · etcd-io/etcd (github.com)

解压后,我们将etcdctl注入环境变量中即可,并执行如下命令:

1
2
3
4
5
6
7
8
9
10
# Linux
export ETCDCTL_API=3
etcdctl --endpoints=https://localhost:2379 \
--cacert=/path/to/ca.crt --cert=/path/to/etcd.crt --key=/path/to/etcd.key endpoint health

# Windows
etcdctl --endpoints=https://localhost:2379 ^
--cacert=D:\app\etcd\ca\server-ca.crt ^
--cert=D:\app\etcd\ca\client.crt ^
--key=D:\app\etcd\ca\client.key endpoint health

其中,endpoints 填入你的etcd集群其中一个的地址;--cacert填入服务端证书 server-ca.crt--cert 填入客户端证书 client.crt--key 填入客户端私钥 client.key,最终你会得到如下结果则表示连接成功:

1
https://localhost:2379 is healthy: successfully committed proposal: took = 20.5117ms

leopold-etcd-starter

接下来,我们封装一个starter,用于Spring Etcd的连接。pom文件如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
<dependency>
<groupId>io.etcd</groupId>
<artifactId>jetcd-core</artifactId>
<version>0.7.5</version>
</dependency>

<!-- https://mvnrepository.com/artifact/com.google.guava/guava -->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>32.1.1-jre</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.bouncycastle/bcprov-jdk15on -->
<dependency>
<groupId>org.bouncycastle</groupId>
<artifactId>bcprov-jdk15on</artifactId>
<version>1.70</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.bouncycastle/bcpkix-jdk15on -->
<dependency>
<groupId>org.bouncycastle</groupId>
<artifactId>bcpkix-jdk15on</artifactId>
<version>1.70</version>
</dependency>

接下来我们要实现以下操作:

  1. 注入证书
  2. 实现加锁
  3. 实现解锁

EtcdLock.java :

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
package com.leopold.etcd;

import io.etcd.jetcd.ByteSequence;
import io.etcd.jetcd.Client;
import io.etcd.jetcd.Lease;
import io.etcd.jetcd.Lock;
import io.etcd.jetcd.lock.LockResponse;
import lombok.extern.slf4j.Slf4j;

import java.nio.charset.StandardCharsets;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

@Slf4j
public class EtcdLock {
private Client client;
private Lock lockClient;
private Lease leaseClient;

private LockState lockState;

class LockState {
private String lockKey;
private String lockPath;
private String errorMsg;
private long leaseTTL;
private long leaseId;
private boolean lockSuccess;

public LockState(String lockKey, long leaseTTL) {
this.lockKey = lockKey;
this.leaseTTL = leaseTTL;
}

public String getLockKey() {
return lockKey;
}

public void setLockKey(String lockKey) {
this.lockKey = lockKey;
}

public String getLockPath() {
return lockPath;
}

public void setLockPath(String lockPath) {
this.lockPath = lockPath;
}

public String getErrorMsg() {
return errorMsg;
}

public void setErrorMsg(String errorMsg) {
this.errorMsg = errorMsg;
}

public long getLeaseId() {
return leaseId;
}

public void setLeaseId(long leaseId) {
this.leaseId = leaseId;
}

public boolean isLockSuccess() {
return lockSuccess;
}

public void setLockSuccess(boolean lockSuccess) {
this.lockSuccess = lockSuccess;
}

public long getLeaseTTL() {
return leaseTTL;
}

public void setLeaseTTL(long leaseTTL) {
this.leaseTTL = leaseTTL;
}
}

public EtcdLock(Client client, String lockKey, Long leaseTTL, TimeUnit unit) {
this.client = client;
this.lockClient = client.getLockClient();
this.leaseClient = client.getLeaseClient();
this.lockState = new LockState(lockKey, unit.toSeconds(leaseTTL));
}


public boolean lock() {
try {
//2.创建租约,不自动续约
createLease();

//3.执行加锁,并为锁对应的Key绑定租约
createLock();
} catch (InterruptedException | ExecutionException e) {
log.error("etcd lock failed -> {}", e.getMessage(), e);
}
return lockState.isLockSuccess();
}

public void unlock() {
String lockPath = this.lockState.getLockPath();
try {
//正常释放锁
if (this.lockState.getLockPath() != null) {
lockClient.unlock(ByteSequence.from(lockState.getLockPath().getBytes())).get();
}

//删除租约
if (lockState.getLeaseId() != 0L) {
leaseClient.revoke(lockState.getLeaseId());
}
} catch (InterruptedException | ExecutionException e) {
log.error("etcd unLock error -> {}", lockPath);
}
log.info("etcd unLock -> {}", lockPath);
}

// 创建一个租约
private void createLease() throws ExecutionException, InterruptedException {
log.debug("[etcd-lock]: start to createLease." + this.lockState.getLockKey() + Thread.currentThread().getName());
try {
long leaseId = leaseClient.grant(this.lockState.getLeaseTTL()).get().getID();
lockState.setLeaseId(leaseId);

// 自动续约
// StreamObserver<LeaseKeepAliveResponse> observer = new StreamObserver<LeaseKeepAliveResponse>() {
// @Override
// public void onNext(LeaseKeepAliveResponse value) {
// log.trace("cluster node lease remaining ttl: {}, lease id: {}", value.getTTL(), value.getID());
// }
//
// @Override
// public void onError(Throwable t) {
// log.error("cluster node lease keep alive failed. exception info: {%s}", t);
// }
//
// @Override
// public void onCompleted() {
// log.trace("cluster node lease completed");
// }
// };
// leaseClient.keepAlive(leaseId, observer);
} catch (InterruptedException | ExecutionException e) {
log.error("[etcd-lock] Create lease failed:" + e);
lockState.setErrorMsg("Create lease failed:" + e);
throw e;
}
}

private void createLock() throws ExecutionException, InterruptedException {
String lockKey = this.lockState.getLockKey();
log.debug("[etcd-lock]: start to createLock." + lockKey + Thread.currentThread().getName());
try {
LockResponse lockResponse = lockClient.lock(ByteSequence.from(lockKey.getBytes()), lockState.getLeaseId()).get();
if (lockResponse != null) {
String lockPath = lockResponse.getKey().toString(StandardCharsets.UTF_8);
this.lockState.setLockPath(lockPath);
log.info("etcd lock -> {}", lockPath);
this.lockState.setLockSuccess(true);
}
} catch (InterruptedException | ExecutionException e) {
log.error("[etcd-lock] lock failed:" + e);
lockState.setErrorMsg("[etcd-lock] lock failed:" + e);
leaseClient.revoke(this.lockState.getLeaseId());
throw e;
}
}
}

可以看到,加锁的过程中,是先创建租约,并为key绑定租约,这里我把自动续约的代码注释掉了,如果你有这类需求可以自行实现。

接下来我们注入证书 EtcdService.java:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
package com.leopold.etcd;

import io.etcd.jetcd.Client;
import io.grpc.netty.GrpcSslContexts;
import io.netty.handler.ssl.SslContext;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.io.File;
import java.util.concurrent.TimeUnit;

@Component
@Data
@Slf4j
@ConfigurationProperties(prefix = "leopold.etcd")
public class EtcdService {

private Client client;

private String endpoint;

private String cert;

private String keyCert;

private String keyFile;

@PostConstruct
public void init() throws Exception {
File cert = new File(this.cert);
File keyCertChainFile = new File(keyCert);
File keyFile = new File(this.keyFile);
SslContext context = GrpcSslContexts.forClient()
.trustManager(cert)
.keyManager(keyCertChainFile, keyFile)
.build();
this.client = Client.builder()
.endpoints(endpoint.split(","))
.sslContext(context)
.build();
this.client.getClusterClient().listMember().get().getMembers().forEach(member -> {
log.info("etcd member: {}", member);
});
}

public EtcdLock createLock(String key, long ttl) {
return new EtcdLock(client, key, ttl, TimeUnit.SECONDS);
}
}

application.yaml:

1
2
3
4
5
6
leopold:
etcd:
endpoint: https://m1:2379,https://m2:2379,https://m3:2379
cert: D:\app\etcd\ca\server-ca.crt
keyCert: D:\app\etcd\ca\client.crt
keyFile: D:\app\etcd\ca\client.key

接下来我们测试锁:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
package com.leopold.service;

import com.leopold.etcd.EtcdLock;
import com.leopold.etcd.EtcdService;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.junit4.SpringRunner;

import javax.annotation.Resource;

@RunWith(SpringRunner.class)
@ActiveProfiles("wireguard")
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
@Slf4j
public class EtcdTest {

@Resource
private EtcdService etcdService;

@Test
public void test() throws InterruptedException {
new Thread(() -> {
log.info("开始锁1");
EtcdLock etcdLock = etcdService.createLock("test", 30L);
if (etcdLock.lock()) {
try {
log.info("执行内容1");
} finally {
log.info("不解锁1");
}
}
}).start();
EtcdLock etcdLock = etcdService.createLock("test", 30L);
new Thread(() -> {
try {
Thread.sleep(1000 * 5);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
log.info("开始锁2");
if (etcdLock.lock()) {
try {
log.info("执行内容2");
} finally {
etcdLock.unlock();
log.info("解锁2");
}
}
}).start();
Thread.sleep(1000 * 60 * 60);
}
}

我们启动两个线程,用于模拟业务执行,任务2先休眠5秒,模拟延迟执行。任务1没有释放锁,模拟30秒内一直持有锁时,任务2是否会阻塞等待。最后的休眠让这个方法一直等待,防止方法立刻执行完。

结果如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
2023-07-03 15:48:55.495  INFO 5816 --- [       Thread-5] com.leopold.service.EtcdTest             : 开始锁1
2023-07-03 15:48:55.516 DEBUG 5816 --- [ Thread-5] com.leopold.etcd.EtcdLock : [etcd-lock]: start to createLease.testThread-5
2023-07-03 15:48:55.609 DEBUG 5816 --- [ Thread-5] com.leopold.etcd.EtcdLock : [etcd-lock]: start to createLock.testThread-5
2023-07-03 15:48:55.663 INFO 5816 --- [ Thread-5] com.leopold.etcd.EtcdLock : etcd lock -> test/175588b84202c823
2023-07-03 15:48:55.663 INFO 5816 --- [ Thread-5] com.leopold.service.EtcdTest : 执行内容1
2023-07-03 15:48:55.663 INFO 5816 --- [ Thread-5] com.leopold.service.EtcdTest : 不解锁1
2023-07-03 15:49:00.524 INFO 5816 --- [ Thread-6] com.leopold.service.EtcdTest : 开始锁2
2023-07-03 15:49:00.524 DEBUG 5816 --- [ Thread-6] com.leopold.etcd.EtcdLock : [etcd-lock]: start to createLease.testThread-6
2023-07-03 15:49:00.558 DEBUG 5816 --- [ Thread-6] com.leopold.etcd.EtcdLock : [etcd-lock]: start to createLock.testThread-6
2023-07-03 15:49:25.953 INFO 5816 --- [ Thread-6] com.leopold.etcd.EtcdLock : etcd lock -> test/175588b84202c869
2023-07-03 15:49:25.954 INFO 5816 --- [ Thread-6] com.leopold.service.EtcdTest : 执行内容2
2023-07-03 15:49:26.000 INFO 5816 --- [ Thread-6] com.leopold.etcd.EtcdLock : etcd unLock -> test/175588b84202c869
2023-07-03 15:49:26.000 INFO 5816 --- [ Thread-6] com.leopold.service.EtcdTest : 解锁2

可以看到,30秒后,任务2才执行业务,符合我们锁的预期。

接下来我们模拟锁的竞争,让任务1和任务2都执行并释放锁,其中任务1线程执行后,休眠1秒,再执行任务2:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
package com.leopold.service;

import com.leopold.etcd.EtcdLock;
import com.leopold.etcd.EtcdService;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.junit4.SpringRunner;

import javax.annotation.Resource;

@RunWith(SpringRunner.class)
@ActiveProfiles("wireguard")
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
@Slf4j
public class EtcdTest {

@Resource
private EtcdService etcdService;

@Test
public void test() throws InterruptedException {
new Thread(() -> {
log.info("开始锁1");
EtcdLock etcdLock = etcdService.createLock("test", 30L);
if (etcdLock.lock()) {
try {
log.info("执行内容1");
} finally {
etcdLock.unlock();
log.info("解锁1");
}
}
}).start();
Thread.sleep(1000);
EtcdLock etcdLock = etcdService.createLock("test", 30L);
new Thread(() -> {
log.info("开始锁2");
if (etcdLock.lock()) {
try {
log.info("执行内容2");
} finally {
etcdLock.unlock();
log.info("解锁2");
}
}
}).start();
Thread.sleep(1000 * 60 * 60);
}
}

结果如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
2023-07-03 16:10:50.462  INFO 19752 --- [       Thread-5] com.leopold.service.EtcdTest             : 开始锁1
2023-07-03 16:10:50.483 DEBUG 19752 --- [ Thread-5] com.leopold.etcd.EtcdLock : [etcd-lock]: start to createLease.testThread-5
2023-07-03 16:10:50.590 DEBUG 19752 --- [ Thread-5] com.leopold.etcd.EtcdLock : [etcd-lock]: start to createLock.testThread-5
2023-07-03 16:10:50.686 INFO 19752 --- [ Thread-5] com.leopold.etcd.EtcdLock : etcd lock -> test/175588b84203167c
2023-07-03 16:10:50.687 INFO 19752 --- [ Thread-5] com.leopold.service.EtcdTest : 执行内容1
2023-07-03 16:10:50.790 INFO 19752 --- [ Thread-5] com.leopold.etcd.EtcdLock : etcd unLock -> test/175588b84203167c
2023-07-03 16:10:50.790 INFO 19752 --- [ Thread-5] com.leopold.service.EtcdTest : 解锁1
2023-07-03 16:10:51.465 INFO 19752 --- [ Thread-6] com.leopold.service.EtcdTest : 开始锁2
2023-07-03 16:10:51.465 DEBUG 19752 --- [ Thread-6] com.leopold.etcd.EtcdLock : [etcd-lock]: start to createLease.testThread-6
2023-07-03 16:10:51.498 DEBUG 19752 --- [ Thread-6] com.leopold.etcd.EtcdLock : [etcd-lock]: start to createLock.testThread-6
2023-07-03 16:10:51.541 INFO 19752 --- [ Thread-6] com.leopold.etcd.EtcdLock : etcd lock -> test/175588b84203168b
2023-07-03 16:10:51.541 INFO 19752 --- [ Thread-6] com.leopold.service.EtcdTest : 执行内容2
2023-07-03 16:10:51.573 INFO 19752 --- [ Thread-6] com.leopold.etcd.EtcdLock : etcd unLock -> test/175588b84203168b
2023-07-03 16:10:51.573 INFO 19752 --- [ Thread-6] com.leopold.service.EtcdTest : 解锁2

那么现在,我们就可以把以前所有save的方法加锁,把所有save的操作封装统一,分别加上独特的key,并加上etcd锁即可,这里贴一下我们以前封装的leopold-jpa-starter 和 leopold-mongo-starter 的示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public List<T> smartSave(SmartSaveDTO smartSaveDTO, Class<T> clazz) {
List<String> entityList = smartSaveDTO.getEntityList();
if (!ObjectUtils.isEmpty(entityList)) {
// 让实体名当作key
EtcdLock etcdLock = etcdService.createLock(clazz.getName(), 1L);
try {
// 锁住保存方法
if (etcdLock.lock()) {
return this.repository.saveAll(collectList);
}
} finally {
etcdLock.unlock();
}
}
throw new CustomException("保存对象为空,禁止保存");
}

leopold-go-etcd

接下来我们尝试在go项目中,引入etcd分布式锁

请注意,网上大多数流传的方式已经过时,这里建议按照官方最新的引入方式:etcd/client/v3 at main · etcd-io/etcd · GitHub,采用 go get go.etcd.io/etcd/client/v3 的代码才可以

请注意你的代理环境,由于涉及到Http协议,请注意编译器代理以及你的代理客户端规则,我由于本地网络复杂,在SSL证书上绕了不少弯路

1
go get go.etcd.io/etcd/client/v3

client.go:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
package etcd

import (
"crypto/tls"
"crypto/x509"
"src/util"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/client/v3/concurrency"
"os"
"time"
)

var config Config

type Config struct {
Endpoints []string
CertFile string
KeyFile string
CAFile string
}

func Init(c Config) {
config = c

// 请更换为你自己的日志框架
logLevel := os.Getenv("LOG_LEVEL")
util.BuildLogger(logLevel)
util.Log().Info("etcd 日志级别 -> %s", logLevel)
}

// GetConn 创建etcd连接
func GetConn() (cli *clientv3.Client, err error) {
cert, err := tls.LoadX509KeyPair(config.CertFile, config.KeyFile)
if err != nil {
return nil, err
}
caCert, err := os.ReadFile(config.CAFile)
if err != nil {
return nil, err
}
caPool := x509.NewCertPool()
caPool.AppendCertsFromPEM(caCert)
tlsConfig := &tls.Config{
Certificates: []tls.Certificate{cert},
RootCAs: caPool,
}
config := clientv3.Config{
Endpoints: config.Endpoints,
TLS: tlsConfig,
DialTimeout: time.Duration(5) * time.Millisecond,
}
cli, err = clientv3.New(config)
if err != nil {
return nil, err
}
return cli, nil
}

// CreateMux 创建锁
func CreateMux(key string, ttl int) (mux *concurrency.Mutex, destroy func(), err error) {
cli, err := GetConn()
if err != nil {
return nil, nil, err
}
session, err := concurrency.NewSession(cli, concurrency.WithTTL(ttl))
if err != nil {
return nil, nil, err
}
mux = concurrency.NewMutex(session, key)
return mux, func() {
err = session.Close()
if err != nil {
util.Log().Error("无法关闭session -> %s", mux.Key())
}
err := cli.Close()
if err != nil {
util.Log().Error("无法关闭etcd连接 -> %s", mux.Key())
} else {
util.Log().Info("成功关闭etcd连接 -> %s", mux.Key())
}
}, nil
}

main.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
package main

import (
"src/etcd"
"golang.org/x/net/context"
"log"
"time"
)

func main() {
etcd.Init(etcd.Config{
Endpoints: []string{"https://192.168.0.67:2379", "https://192.168.0.68:2379", "https://192.168.0.69:2379"},
CertFile: "D:\\app\\etcd\\ca\\client.crt",
KeyFile: "D:\\app\\etcd\\ca\\client.key",
CAFile: "D:\\app\\etcd\\ca\\server-ca.crt",
})
log.Println("开始锁1")
mux1, destroy1, err := etcd.CreateMux("test", 30)
defer destroy1()
if err != nil {
log.Fatal("创建etcd连接失败")
}
if err := mux1.Lock(context.TODO()); err != nil {
log.Fatal(err)
}
log.Println("执行内容1")
go func() {
log.Println("开始锁2")
mux2, destroy2, err := etcd.CreateMux("test", 30)
defer destroy2()
if err != nil {
log.Fatal("创建etcd连接失败")
}
if err := mux2.Lock(context.TODO()); err != nil {
log.Fatal(err)
}
log.Println("执行内容2")
if err := mux2.Unlock(context.TODO()); err != nil {
log.Fatal(err)
}
log.Println("解锁2")
}()
time.Sleep(30 * time.Second)
if err := mux1.Unlock(context.TODO()); err != nil {
log.Fatal(err)
}
log.Println("解锁1")
time.Sleep(60 * time.Second)
}

执行结果如下:

1
2
3
4
5
6
7
2023-07-05 08:57:07.688483 I | 开始锁1
2023-07-05 08:57:07.848467 I | 执行内容1
2023-07-05 08:57:07.848467 I | 开始锁2
2023-07-05 08:57:37.890539 I | 解锁1
2023-07-05 08:57:37.917117 I | 执行内容2
2023-07-05 08:57:37.953923 I | 解锁2
2023-07-05 08:57:37.986050 I | 成功关闭etcd连接

可以发现,内容1持有锁30秒,解锁后,内容2才执行,符合预期

请注意,log.Fatal 函数会导致程序立即终止,并打印指定的错误消息。所以日志打印请根据自身日志框架封装实现。本文并未实现etcd连接共用,如果你有短期内高频次的锁需求,需自行改写为共用一个连接,否则连接开销较大


其他

网上的代码太旧了,老版本成功后导入到项目中发现版本冲突且无法降级,应该先看官方文档的。本地代理用的sock5,代码中使用了Http协议,导致无法建立SSL连接,此处排查问题花费很久(翻了一天的Github issue),无意间打开代理软件才发现一片红的错误日志……

对于Goland的代理如下:

image-20230705172507814

现在我的所有项目都使用了etcd锁,原先一些无状态的应用由于使用了互斥锁,在多pod下依然无法保持幂等性,现在引入了分布式锁,可以保证执行的原子性。接下来我将尝试创建pulsar消息队列,将一些服务的流转,从grpc 方法的逐级调用,到领域事件驱动,这样整个服务的健壮性就几乎完美了。

Pulsar集群内存至少得2G才行,服务器内存不够了,这里埋个坑~~,不过你可以参考下这篇文章,写的不错。https://blog.frognew.com/2021/12/learning-apache-pulsar-14.html


参考资料

[1] go get fails in coreos/etcd with undefined: resolver.BuildOption · Issue #11931 · etcd-io/etcd (github.com)

[2] Etcd使用go module的灾难 (colobu.com)