privatedefappend(records: MemoryRecords, // ..... ignoreRecordSize: Boolean): LogAppendInfo = { // .... // they are valid, insert them in the log lock synchronized { maybeHandleIOException(s"Error while appending records to $topicPartition in dir ${dir.getParent}") { localLog.checkIfMemoryMappedBufferClosed() if (validateAndAssignOffsets) { // assign offsets to the message set val offset = PrimitiveRef.ofLong(localLog.logEndOffset) appendInfo.setFirstOffset(offset.value) val validateAndOffsetAssignResult = try { val targetCompression = BrokerCompressionType.targetCompression(config.compression, appendInfo.sourceCompression()) val validator = newLogValidator(validRecords, // .... appendInfo.sourceCompression, targetCompression, // ..... ) validator.validateMessagesAndAssignOffsets(offset, validatorMetricsRecorder, requestLocal.getOrElse(thrownewIllegalArgumentException( "requestLocal should be defined if assignOffsets is true") ).bufferSupplier ) } catch { case e: IOException => thrownewKafkaException(s"Error validating messages while appending to log $name", e) } // .... } } }
privateValidationResult buildRecordsAndAssignOffsets(LongRef offsetCounter, long logAppendTime, RecordBatch firstBatch, List<Record> validatedRecords, int uncompressedSizeInBytes) { long startNanos = time.nanoseconds(); int estimatedSize = AbstractRecords.estimateSizeInBytes(toMagic, offsetCounter.value, targetCompression.type(), validatedRecords); // The current implementation of BufferSupplier is naive and works best when the buffer size // cardinality is low, so don't use it here ByteBuffer buffer = ByteBuffer.allocate(estimatedSize); MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, toMagic, targetCompression, timestampType, offsetCounter.value, logAppendTime, firstBatch.producerId(), firstBatch.producerEpoch(), firstBatch.baseSequence(), firstBatch.isTransactional(), partitionLeaderEpoch);
for (Record record : validatedRecords) builder.appendWithOffset(offsetCounter.value++, record);