Snowflake概要
Snowflake是一种分布式全局唯一ID的算法,最初是由Twitter创建,看图:
Snowflake由64bit组成,共有4部分组成:
- 占用1bit,目前暂时未使用,始终是0;
- 时间戳占用41bit,由于当前时间戳的二进制就是41bit,所以这个时间是精确到毫秒,这个时间是由1970年开始计算的,由于41bit的最大值限制,往后可以使用69年。
- 机器id占用10bit,最多可以表示1024台机器
- 序列号占用12bit,表示同一台机器上同一时刻最多可以同时生成4096个id
agrona中的SnowflakeIdGenerator源码解析
构造方法
public SnowflakeIdGenerator(
final int nodeIdBits,
final int sequenceBits,
final long nodeId,
final long timestampOffsetMs,
final EpochClock clock)
{
.....
}
- nodeIdBits: 设置机器id占用多少字节,默认是10bit
- sequenceBits: 设置序列号占用多少字节,默认值12bit
- nodeId: 设置机器的ID
- timestampOffsetMs: 在Snowflake概要中提到,雪花算法中的时间戳占用41bit,从1970开始只能支持69年,也就是2039年;timestampOffsetMs就是解决这个问题,设置从指定的时间开始而不是1970
- clock: agrona中的时钟,默认使用的是系统时钟
SystemEpochClock
public class SystemEpochClock implements EpochClock
{
/**
* As there is no instance state then this object can be used to save on allocation.
*/
public static final SystemEpochClock INSTANCE = new SystemEpochClock();
/**
* {@inheritDoc}
*/
public long time()
{
return System.currentTimeMillis();
}
}
- timestampSequence: 记录时间戳左移22位后的值,为了防止出现并发问题,agrona更新这个字段使用了cas;
abstract class AbstractSnowflakeIdGeneratorPaddingLhs
{
byte p000, p001, p002, p003, p004, p005, p006, p007, p008, p009, p010, p011, p012, p013, p014, p015;
byte p016, p017, p018, p019, p020, p021, p022, p023, p024, p025, p026, p027, p028, p029, p030, p031;
byte p032, p033, p034, p035, p036, p037, p038, p039, p040, p041, p042, p043, p044, p045, p046, p047;
byte p048, p049, p050, p051, p052, p053, p054, p055, p056, p057, p058, p059, p060, p061, p062, p063;
}
abstract class AbstractSnowflakeIdGeneratorValue extends AbstractSnowflakeIdGeneratorPaddingLhs
{
static final AtomicLongFieldUpdater<AbstractSnowflakeIdGeneratorValue> TIMESTAMP_SEQUENCE_UPDATER =
AtomicLongFieldUpdater.newUpdater(AbstractSnowflakeIdGeneratorValue.class, "timestampSequence");
volatile long timestampSequence;
}
abstract class AbstractSnowflakeIdGeneratorPaddingRhs extends AbstractSnowflakeIdGeneratorValue
{
byte p064, p065, p066, p067, p068, p069, p070, p071, p072, p073, p074, p075, p076, p077, p078, p079;
byte p080, p081, p082, p083, p084, p085, p086, p087, p088, p089, p090, p091, p092, p093, p094, p095;
byte p096, p097, p098, p099, p100, p101, p102, p103, p104, p105, p106, p107, p108, p109, p110, p111;
byte p112, p113, p114, p115, p116, p117, p118, p119, p120, p121, p122, p123, p124, p125, p126, p127;
}
从上面的代码我们可以看到在timestampSequence字段的前后都填充了64bit,这是为了解决伪共享的问题,现在的CPU都有缓存行通常是64bit,timestampSequence的字段类型是long占用8个字节,如果不填充,加载了timestampSequence缓存行还包含了其他数据,如果这些数据过期会导致整个缓存行都过期,这在高并发的场景下会影响性能。
生成id的核心方法nextId
该方法主要分为三种情况:
timestampMs > oldTimestampMs
,说明雪花算法中的时间戳部分没有冲突,那么把当前时间戳timestampMs << nodeIdAndSequenceBits
, 然后cas更新timestampSequence;返回newTimestampSequence | nodeBits
;
if (timestampMs > oldTimestampMs)
{
final long newTimestampSequence = timestampMs << nodeIdAndSequenceBits;
if (TIMESTAMP_SEQUENCE_UPDATER.compareAndSet(this, oldTimestampSequence, newTimestampSequence))
{
return newTimestampSequence | nodeBits;
}
}
其中的nodeIdAndSequenceBits
与nodeBits
是在构造方法中设置的
this.nodeIdAndSequenceBits = nodeIdBits + sequenceBits; //10+12=22bit
this.nodeBits = nodeId << sequenceBits; //机器id左移12bit
timestampMs == oldTimestampMs
,说明时间戳部分冲突,需要通过递增序列号来保证唯一,从oldTimestampMs中解析出序列号oldSequence,只要没有超过允许的最大序列号就+1后继续cas;如果序列号超过了最大值就继续循环直到下一毫秒在生成id,这样时间戳就不会冲突
if (timestampMs == oldTimestampMs)
{
final long oldSequence = oldTimestampSequence & maxSequence;
if (oldSequence < maxSequence)
{
final long newTimestampSequence = oldTimestampSequence + 1;
if (TIMESTAMP_SEQUENCE_UPDATER.compareAndSet(this, oldTimestampSequence, newTimestampSequence))
{
return newTimestampSequence | nodeBits;
}
}
}
timestampMs == oldTimestampMs
, 说明时间出现了回拨,可能是人为原因,把系统时间改了,或者是机器之间的时间同步出现了误差,在这种情况下agrona之间抛异常
else
{
throw new IllegalStateException(
"clock has gone backwards: timestampMs=" + timestampMs + " < oldTimestampMs=" + oldTimestampMs);
}
扩展
以上我们讲完了SnowflakeIdGenerator所有的核心,现在我们考虑下,通常情况我们的服务都会部署多个实例,那么在创建SnowflakeIdGenerator应该如何设置nodeId, 如果是命令行jar的方式启动,可以通过设置命令行参数,但如果是k8s容器启动并且可以随机增加减少实例就不太方便,此时我们可以考虑使用zk的零时节点来动态生成nodeId。
设计配置参数yml, SnowflakeIdGeneratorProperties
snowflake:
zookeeper:
servers: zoo1.com:2181
namespace: snowflake
group: ${project_name}
timestampOffsetMs: xxx
nodeIdBits: 10
- zookeeper: 配置zk相关的参数信息
- timestampOffsetMs: 配置SnowflakeIdGenerator的时间戳偏移量
- nodeIdBits: 配置机器id占用的字节数
扩展agrona中的SnowflakeIdGenerator
public class ExtendedSnowflakeIdGenerator implement ConnectStateListener{
private SnowflakeIdGenerator idGenerator;
private long timestampOffSetMs;
private boolean suspend;
private boolean inactive;
private LongSupplier nodeIdSupplier;
public ExtendedSnowflakeIdGenerator(LongSupplier nodeIdSupplier, long timestampOffSetMs){
this.timestampOffSetMs = timestampOffSetMs;
this.idGenerator = new SnowflakeIdGenerator(...);
}
public long nextId(){
if(inactive){
throw new IllegalStatExecption(...);
}
return idGenerator.nextId();
}
public void stateChanged(CuratorFramework zkClient, ConnectionState state){
if(state == SUSPEND){
this.suspend=true;
}
if(state == LOST){
this.inactive=true;
}
if(state == RECONNECTED){
long nodeId = nodeIdSupplier.getAsLong();
this.idGenerator = new SnowflakeIdGenerator(...);
}
}
//geter, setter ...
}
因为我们考虑使用zk来扩展SnowflakeIdGenerator, 如果出现了zk的链接状态不稳定时,我们需要对SnowflakeIdGenerator做相应的控制,所以这增加了两个字段suspend
inactive
,然后实现了ConnectStateListener来感知zk链接的状态变化; 如果链接状态是RECONNECTED, 那么需要重新生成nodeId和SnowflakeIdGenerator避免出现了不同的实例使用了相同的nodeId
接下来创建SnowFlakeFactory,主要的职责就是创建ExtendedSnowflakeIdGenerator
以及获取一个可用的nodeId,主要是通过判断zk的节点是否存在来校验nodeId是否被使用。
public class SnowFlakeFactory {
private CuratorFrameWork zkClient;
private ExtendedSnowflakeIdGenerator idGenerator;
public SnowFlakeFactory(){
}
public void destroy(){
if(zkClient.getState() != STOPPED){
zkClient.close();
}
}
public ExtendedSnowflakeIdGenerator createIdGenerator(){
return new ExtendedSnowflakeIdGenerator(
()->this.getAvailableNodeId(),
...
);
}
public long getAvailableNodeId(){
String groupPath = String.format("/%s/id_node", group);
Stat stat = zkClient.checkExists().forPath(groupPaht);
List<String> existNodes ;
if(stat==null){
zkClient.create().creatingParentsIfNeeded()
.forPath(groupPath,"".getBytes();
existNodes=new ArrayList();
}else {
existNodes=zkClient.getChildren().forPath(groupPath);
}
String workPath;
long maxNodeId = (long) (Math.pow(2, nodeIdBits) -1);
for(long i=1; i< maxNodeId; i++){
workPath = String.format("work_%d",i);
if(!existNodes.contains(workPath)){
try{
zkClient.create.withMode(EPHEMERAL)
.forPath(groupPath+"/"+workPath,"".getBytes());
return i;
}catch(NodeExistsException e){
}
}
}
}
}
原文链接: http://herman7z.site