@@ -2,11 +2,9 @@ package consensus
22
33import (
44 "encoding/json"
5- "errors"
65 "fmt"
76 "sync"
87
9- "github.com/dgraph-io/badger/v2"
108 "github.com/rs/zerolog"
119 "github.com/rs/zerolog/log"
1210 "go.uber.org/atomic"
@@ -15,7 +13,7 @@ import (
1513 "github.com/onflow/flow-go/model/flow"
1614 "github.com/onflow/flow-go/module/mempool"
1715 "github.com/onflow/flow-go/storage"
18- "github.com/onflow/flow-go/storage/badger/operation "
16+ "github.com/onflow/flow-go/storage/store "
1917)
2018
2119// ExecForkSuppressor is a wrapper around a conventional mempool.IncorporatedResultSeals
@@ -40,15 +38,15 @@ import (
4038//
4139// Implementation is concurrency safe.
4240type ExecForkSuppressor struct {
43- mutex sync.RWMutex
44- seals mempool.IncorporatedResultSeals
45- sealsForBlock map [flow.Identifier ]sealSet // map BlockID -> set of IncorporatedResultSeal
46- byHeight map [uint64 ]map [flow.Identifier ]struct {} // map height -> set of executed block IDs at height
47- lowestHeight uint64
48- execForkDetected atomic.Bool
49- onExecFork ExecForkActor
50- db * badger. DB
51- log zerolog.Logger
41+ mutex sync.RWMutex
42+ seals mempool.IncorporatedResultSeals
43+ sealsForBlock map [flow.Identifier ]sealSet // map BlockID -> set of IncorporatedResultSeal
44+ byHeight map [uint64 ]map [flow.Identifier ]struct {} // map height -> set of executed block IDs at height
45+ lowestHeight uint64
46+ execForkDetected atomic.Bool
47+ onExecFork ExecForkActor
48+ execForkEvidenceStore storage. ExecutionForkEvidence
49+ log zerolog.Logger
5250}
5351
5452var _ mempool.IncorporatedResultSeals = (* ExecForkSuppressor )(nil )
@@ -59,25 +57,33 @@ type sealSet map[flow.Identifier]*flow.IncorporatedResultSeal
5957// sealsList is a list of seals
6058type sealsList []* flow.IncorporatedResultSeal
6159
62- func NewExecStateForkSuppressor (seals mempool.IncorporatedResultSeals , onExecFork ExecForkActor , db * badger.DB , log zerolog.Logger ) (* ExecForkSuppressor , error ) {
63- conflictingSeals , err := checkExecutionForkEvidence (db )
60+ func NewExecStateForkSuppressor (
61+ seals mempool.IncorporatedResultSeals ,
62+ onExecFork ExecForkActor ,
63+ db storage.DB ,
64+ log zerolog.Logger ,
65+ ) (* ExecForkSuppressor , error ) {
66+ executionForkEvidenceStore := store .NewExecutionForkEvidence (db )
67+
68+ conflictingSeals , err := executionForkEvidenceStore .Retrieve ()
6469 if err != nil {
6570 return nil , fmt .Errorf ("failed to interface with storage: %w" , err )
6671 }
72+
6773 execForkDetectedFlag := len (conflictingSeals ) != 0
6874 if execForkDetectedFlag {
6975 onExecFork (conflictingSeals )
7076 }
7177
7278 wrapper := ExecForkSuppressor {
73- mutex : sync.RWMutex {},
74- seals : seals ,
75- sealsForBlock : make (map [flow.Identifier ]sealSet ),
76- byHeight : make (map [uint64 ]map [flow.Identifier ]struct {}),
77- execForkDetected : * atomic .NewBool (execForkDetectedFlag ),
78- onExecFork : onExecFork ,
79- db : db ,
80- log : log .With ().Str ("mempool" , "ExecForkSuppressor" ).Logger (),
79+ mutex : sync.RWMutex {},
80+ seals : seals ,
81+ sealsForBlock : make (map [flow.Identifier ]sealSet ),
82+ byHeight : make (map [uint64 ]map [flow.Identifier ]struct {}),
83+ execForkDetected : * atomic .NewBool (execForkDetectedFlag ),
84+ onExecFork : onExecFork ,
85+ execForkEvidenceStore : executionForkEvidenceStore ,
86+ log : log .With ().Str ("mempool" , "ExecForkSuppressor" ).Logger (),
8187 }
8288
8389 return & wrapper , nil
@@ -337,41 +343,6 @@ func hasConsistentStateTransitions(irSeal, irSeal2 *flow.IncorporatedResultSeal)
337343 return true
338344}
339345
340- // checkExecutionForkDetected checks the database whether evidence
341- // about an execution fork is stored. Returns the stored evidence.
342- func checkExecutionForkEvidence (db * badger.DB ) ([]* flow.IncorporatedResultSeal , error ) {
343- var conflictingSeals []* flow.IncorporatedResultSeal
344- err := db .View (func (tx * badger.Txn ) error {
345- err := operation .RetrieveExecutionForkEvidence (& conflictingSeals )(tx )
346- if errors .Is (err , storage .ErrNotFound ) {
347- return nil // no evidence in data base; conflictingSeals is still nil slice
348- }
349- if err != nil {
350- return fmt .Errorf ("failed to load evidence whether or not an execution fork occured: %w" , err )
351- }
352- return nil
353- })
354- return conflictingSeals , err
355- }
356-
357- // storeExecutionForkEvidence stores the provided seals in the database
358- // as evidence for an execution fork.
359- func storeExecutionForkEvidence (conflictingSeals []* flow.IncorporatedResultSeal , db * badger.DB ) error {
360- err := operation .RetryOnConflict (db .Update , func (tx * badger.Txn ) error {
361- err := operation .InsertExecutionForkEvidence (conflictingSeals )(tx )
362- if errors .Is (err , storage .ErrAlreadyExists ) {
363- // some evidence about execution fork already stored;
364- // we only keep the first evidence => noting more to do
365- return nil
366- }
367- if err != nil {
368- return fmt .Errorf ("failed to store evidence about execution fork: %w" , err )
369- }
370- return nil
371- })
372- return err
373- }
374-
375346// filterConflictingSeals performs filtering of provided seals by checking if there are conflicting seals for same block.
376347// For every block we check if first seal has same state transitions as others. Multiple seals for same block are allowed
377348// but their state transitions should be the same. Upon detecting seal with inconsistent state transition we will clear our mempool,
@@ -395,7 +366,7 @@ func (s *ExecForkSuppressor) filterConflictingSeals(sealsByBlockID map[flow.Iden
395366 s .execForkDetected .Store (true )
396367 s .Clear ()
397368 conflictingSeals = append (sealsList {candidateSeal }, conflictingSeals ... )
398- err := storeExecutionForkEvidence ( conflictingSeals , s . db )
369+ err := s . execForkEvidenceStore . StoreIfNotExists ( conflictingSeals )
399370 if err != nil {
400371 panic ("failed to store execution fork evidence" )
401372 }
0 commit comments