@@ -25,7 +25,8 @@ type UpdatedDesc struct {
2525}
2626
2727type FullStream struct {
28- Id string `bson:"_id,omitempty"`
28+ Id string `bson:"_id"`
29+ ChunkId string `bson:"chunkId"`
2930 PodId string `bson:"podId,omitempty"`
3031 PodIp string `bson:"podIp,omitempty"`
3132 ExpireAt time.Time `bson:"expireAt,omitempty"`
@@ -160,7 +161,7 @@ func iterateChangeStream(d *Drsm, routineCtx context.Context, stream *mongo.Chan
160161 cp .Owner .PodIp = s .Update .UpdFields .PodIp
161162 podD := d .podMap [owner ]
162163 podD .podChunks [c ] = cp // add chunk to pod
163- log . Printf ( " pod to chunk map %v " , podD .podChunks )
164+ logger . AppLog . Infof ( "Stream(Update): pod to chunk map %v " , podD .podChunks )
164165 }
165166 case "delete" :
166167 logger .AppLog .Debugf ("delete operations" )
@@ -169,7 +170,7 @@ func iterateChangeStream(d *Drsm, routineCtx context.Context, stream *mongo.Chan
169170 // delete olnly gets document id
170171 pod , found := d .podMap [s .DId .Id ]
171172 if pod != nil {
172- log . Printf ( " Pod %v and found %v. Chunks owned by crashed pod = %v " , pod , found , pod .podChunks )
173+ logger . AppLog . Infof ( "Stream(Delete): Pod %v and found %v. Chunks owned by crashed pod = %v " , pod , found , pod .podChunks )
173174 d .podDown <- s .DId .Id
174175 }
175176 }
@@ -201,7 +202,7 @@ func (d *Drsm) punchLiveness() {
201202
202203 _ , err := d .mongo .PutOneCustomDataStructure (d .sharedPoolName , filter , update )
203204 if err != nil {
204- logger .AppLog .Debugf ("put data failed : " , err )
205+ logger .AppLog .Errorf ("put data failed : " , err )
205206 // TODO : should we panic ?
206207 continue
207208 }
@@ -219,12 +220,13 @@ func (d *Drsm) checkAllChunks() {
219220 case <- ticker .C :
220221 filter := bson.M {"type" : "chunk" }
221222 result , err := d .mongo .RestfulAPIGetMany (d .sharedPoolName , filter )
223+ log .Printf ("chunk entry: %v" , result )
222224 if err == nil && result != nil {
223225 for _ , v := range result {
224226 var s FullStream
225227 bsonBytes , _ := bson .Marshal (v )
226228 bson .Unmarshal (bsonBytes , & s )
227- log . Printf ("Individual bson Element %v " , s )
229+ logger . AppLog . Infof ("Individual Chunk bson Element %v " , s )
228230 d .addChunk (& s )
229231 }
230232 }
@@ -240,6 +242,10 @@ func (d *Drsm) addChunk(full *FullStream) {
240242 pod = d .addPod (full )
241243 }
242244 did := full .Id
245+ if did == "" {
246+ did = full .ChunkId
247+ }
248+ logger .AppLog .Infof ("received Chunk Doc: %v" , full )
243249 cid := getChunIdFromDocId (did )
244250 o := PodId {PodName : full .PodId , PodIp : full .PodIp }
245251 c := & chunk {Id : cid , Owner : o }
@@ -248,14 +254,14 @@ func (d *Drsm) addChunk(full *FullStream) {
248254 pod .podChunks [cid ] = c
249255 d .globalChunkTbl [cid ] = c
250256
251- log . Printf ("Chunk id %v, pod. podChunks %v " , cid , pod .podChunks )
257+ logger . AppLog . Infof ("Chunk id %v, podChunks %v " , cid , pod .podChunks )
252258}
253259
254260func (d * Drsm ) addPod (full * FullStream ) * podData {
255261 podI := PodId {PodName : full .PodId , PodIp : full .PodIp }
256262 pod := & podData {PodId : podI }
257263 pod .podChunks = make (map [int32 ]* chunk )
258264 d .podMap [full .PodId ] = pod
259- log . Printf ("Keepalive insert d.podMaps %+v" , d .podMap )
265+ logger . AppLog . Infof ("Keepalive insert d.podMaps %+v" , d .podMap )
260266 return pod
261267}
0 commit comments