a2filter.go 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153
  1. // a2filter.go - A2 bloom filter
  2. //
  3. // To the extent possible under law, the Yawning Angel waived all copyright
  4. // and related or neighboring rights to a2filter, using the creative
  5. // commons "cc0" public domain dedication. See LICENSE or
  6. // <http://creativecommons.org/publicdomain/zero/1.0/> for full details.
  7. // Package a2filter implements a SipHash-2-4 based Active-Active Bloom Filter.
  8. // It is designed to be stable over time even when filled to max capacity by
  9. // implementing the active-active buffering (A2 buffering) scheme presented in
  10. // "Aging Bloom Filter with Two Active Buffers for Dynamic Sets" (MyungKeun
  11. // Yoon).
  12. //
  13. // Note that none of the operations on the filter are constant time, and the
  14. // the max backing Bloom Filter size is limited to 2^31 bytes. This package is
  15. // threadsafe.
  16. package a2filter
  17. import (
  18. "encoding/binary"
  19. "fmt"
  20. "io"
  21. "math"
  22. "sync"
  23. "github.com/dchest/siphash"
  24. )
  25. const (
  26. ln2 = 0.69314718055994529
  27. ln2Sq = 0.48045301391820139
  28. maxMln2 = 31
  29. maxNrHashes = 32
  30. )
  31. // A2Filter is an Active-Active Bloom Filter.
  32. type A2Filter struct {
  33. sync.Mutex
  34. k1, k2 uint64
  35. nrEntries int
  36. nrEntriesMax int
  37. nrHashes int
  38. hashMask uint32
  39. active1 []byte
  40. active2 []byte
  41. }
  42. // New constructs a new A2Filter with a filter set size 2^mLn2, and false
  43. // postive rate p. The actual in memory footprint of the datastructure will be
  44. // approximately 2^(mLn2+1) bits due to the double buffered nature of the
  45. // filter.
  46. func New(rand io.Reader, mLn2 int, p float64) (*A2Filter, error) {
  47. var key [16]byte
  48. if _, err := io.ReadFull(rand, key[:]); err != nil {
  49. return nil, err
  50. }
  51. if mLn2 > maxMln2 {
  52. return nil, fmt.Errorf("requested filter too large: %d", mLn2)
  53. }
  54. m := 1 << uint32(mLn2)
  55. n := -1.0 * float64(m) * ln2Sq / math.Log(p)
  56. k := int((float64(m) * ln2 / n) + 0.5)
  57. f := new(A2Filter)
  58. f.k1 = binary.BigEndian.Uint64(key[0:8])
  59. f.k2 = binary.BigEndian.Uint64(key[8:16])
  60. f.nrEntriesMax = int(n)
  61. f.nrHashes = k
  62. f.hashMask = uint32(m - 1)
  63. if f.nrHashes < 2 {
  64. f.nrHashes = 2
  65. }
  66. if f.nrHashes > maxNrHashes {
  67. return nil, fmt.Errorf("requested parameters need too many hashes")
  68. }
  69. f.active1 = make([]byte, m/8)
  70. f.active2 = make([]byte, m/8)
  71. return f, nil
  72. }
  73. // TestAndSet tests the A2Filter for a given value's membership, adds the
  74. // value to the filter and returns if it was present at the time of the call.
  75. func (f *A2Filter) TestAndSet(b []byte) bool {
  76. hashes := f.getHashes(b)
  77. f.Lock()
  78. defer f.Unlock()
  79. // If the member is present in Active1, just return.
  80. if f.testCache(f.active1, hashes) {
  81. return true
  82. }
  83. // Test Active2 for membership, and add the value to Active1.
  84. ret := f.testCache(f.active2, hashes)
  85. if f.nrEntries++; f.nrEntries > f.nrEntriesMax {
  86. // Active1 is full, clear Active2 and swap the buffers, this leaves
  87. // Active1 empty, and Active2 populated to saturation, immediately
  88. // after the tested entry will be added to Active1.
  89. f.active2 = make([]byte, len(f.active2))
  90. f.active1, f.active2 = f.active2, f.active1
  91. f.nrEntries = 1
  92. }
  93. f.addActive1(hashes)
  94. return ret
  95. }
  96. // MaxEntries returns the maximum capacity of the A2Filter. This value is
  97. // usually an underestimate as the filter is double buffered, however entry
  98. // count accounting is only done for Active1, so Active2 should be ignored in
  99. // calculations.
  100. func (f *A2Filter) MaxEntries() int {
  101. return f.nrEntriesMax
  102. }
  103. func (f *A2Filter) testCache(cache []byte, hashes []uint32) bool {
  104. for i := 0; i < f.nrHashes; i++ {
  105. idx := hashes[i] & f.hashMask
  106. if 0 == cache[idx/8]&(1<<(idx&7)) {
  107. // Break out early if there is a miss.
  108. return false
  109. }
  110. }
  111. return true
  112. }
  113. func (f *A2Filter) addActive1(hashes []uint32) {
  114. for i := 0; i < f.nrHashes; i++ {
  115. idx := hashes[i] & f.hashMask
  116. f.active1[idx/8] |= (1 << (idx & 7))
  117. }
  118. }
  119. func (f *A2Filter) getHashes(b []byte) []uint32 {
  120. // Per "Less Hashing, Same Performance: Building a Better Bloom Filter"
  121. // (Kirsch and Miteznmacher), with a suitably good PRF, only two calls to
  122. // the hash algorithm are needed. As SipHash-2-4 returns a 64 bit digest,
  123. // and we use 32 bit hashes for the filter, this results in only one
  124. // invocation of SipHash-2-4.
  125. hashes := make([]uint32, f.nrHashes)
  126. baseHash := siphash.Hash(f.k1, f.k2, b)
  127. hashes[0] = uint32(baseHash & math.MaxUint32)
  128. hashes[1] = uint32(baseHash >> 32)
  129. for i := 2; i < f.nrHashes; i++ {
  130. hashes[i] = hashes[0] + uint32(i)*hashes[1]
  131. }
  132. return hashes
  133. }