Overview
Karman provides an elegant Groovy DSL for working with cloud storage, making file operations intuitive and expressive. This guide covers advanced usage patterns, streaming capabilities, and powerful file manipulation techniques.
Basic DSL Syntax
Accessing Files with Subscript Operator
Karman leverages Groovy’s subscript operator ([]) for intuitive file access:
// Access provider -> bucket -> file
def file = provider['my-bucket']['path/to/file.txt']
// Alternative dot notation
def file = provider.'my-bucket'.'path/to/file.txt'
// Chain operations fluently
provider['bucket']['file.txt'].text = 'Hello!'
Simplified Property Access
// Get file content
def content = file.text
def bytes = file.bytes
def stream = file.inputStream
// Set file content
file.text = 'New content'
file.bytes = byteArray
file.inputStream = new FileInputStream('local-file.txt')
// Check existence
if (file.exists()) {
println "File found: ${file.name}"
}
Streaming Data
Reading Streams
Karman provides multiple ways to work with streams for memory-efficient file handling:
// Basic InputStream access
def inputStream = file.inputStream
// Use Groovy's withInputStream closure
file.inputStream.withStream { stream ->
// Process stream
stream.eachLine { line ->
println line
}
}
// Read in chunks
file.inputStream.withStream { stream ->
byte[] buffer = new byte[8192]
int bytesRead
while ((bytesRead = stream.read(buffer)) != -1) {
// Process buffer
processChunk(buffer, bytesRead)
}
}
// Read with encoding
file.inputStream.withReader('UTF-8') { reader ->
reader.eachLine { line ->
println line
}
}
Writing Streams
// Write from InputStream
def localFile = new File('upload.dat')
file.inputStream = localFile.newInputStream()
file.contentLength = localFile.length()
file.save()
// Write using OutputStream
def outputStream = file.outputStream
outputStream.withStream { stream ->
stream << 'Line 1\n'
stream << 'Line 2\n'
}
file.save()
// Stream from URL
def url = new URL('https://example.com/large-file.zip')
file.inputStream = url.openStream()
file.save()
Piping Between Files
Copy files efficiently between different storage providers:
// Copy from S3 to local storage
def s3Provider = StorageProvider.create(provider: 's3', ...)
def localProvider = StorageProvider.create(provider: 'local', ...)
def sourceFile = s3Provider['source-bucket']['data.csv']
def destFile = localProvider['local-dir']['data.csv']
// Stream-based copy (memory efficient)
destFile.inputStream = sourceFile.inputStream
destFile.contentType = sourceFile.contentType
destFile.save()
// Copy between cloud providers
def azureProvider = StorageProvider.create(provider: 'azure-pageblob', ...)
def gcsProvider = StorageProvider.create(provider: 'google', ...)
def azureFile = azureProvider['container']['file.dat']
def gcsFile = gcsProvider['bucket']['file.dat']
gcsFile.inputStream = azureFile.inputStream
gcsFile.contentLength = azureFile.contentLength
gcsFile.contentType = azureFile.contentType
gcsFile.save()
Range-Based Access
Karman supports HTTP range requests for partial file downloads, useful for: - Resumable downloads - Downloading specific parts of large files - Reading file headers without downloading entire file - Streaming media with seek capabilities
Basic Range Requests
// Note: Range support varies by provider
// AWS S3, Google Cloud Storage, and Azure support ranges
// Get first 1024 bytes
def s3File = provider['bucket']['large-file.bin']
def headerBytes = s3File.getBytes(0, 1023)
// Get specific segment
def middleBytes = s3File.getBytes(1024, 2047)
// Read last 100 bytes of file
def fileSize = s3File.contentLength
def tailBytes = s3File.getBytes(fileSize - 100, fileSize - 1)
Streaming with Ranges
// Stream a specific range
def file = provider['bucket']['video.mp4']
// Get range as stream
file.getInputStream(startByte: 0, endByte: 1048575).withStream { stream ->
// Process first megabyte
byte[] chunk = stream.bytes
processVideoChunk(chunk)
}
// Download file in chunks (parallel downloading)
def chunkSize = 5 * 1024 * 1024 // 5MB chunks
def fileSize = file.contentLength
def chunks = (fileSize / chunkSize).toInteger() + 1
chunks.times { index ->
def start = index * chunkSize
def end = Math.min(start + chunkSize - 1, fileSize - 1)
def chunkData = file.getBytes(start, end)
saveChunk(index, chunkData)
}
Resumable Downloads
// Resume download from last position
def downloadedBytes = new File('partial-download.dat').length()
def remainingStream = file.getInputStream(startByte: downloadedBytes)
new File('partial-download.dat').withOutputStream { out ->
out << remainingStream
}
Listing Files with Filters
Basic Listing
// List all files
bucket.listFiles().each { file ->
println "${file.name} - ${file.contentLength} bytes"
}
// List with closure
bucket.listFiles { file ->
println file.name
}
Prefix Filtering
Use prefixes to list files in "folder-like" structures:
// List files with prefix (like a folder)
bucket.listFiles(prefix: 'uploads/2024/').each { file ->
println file.name
}
// List files in specific path
bucket.listFiles(prefix: 'images/products/').each { file ->
if (file.name.endsWith('.jpg')) {
println "Found image: ${file.name}"
}
}
// Multiple prefix patterns
def prefixes = ['logs/2024/', 'logs/2023/']
prefixes.each { prefix ->
println "Files in ${prefix}:"
bucket.listFiles(prefix: prefix).each { file ->
println " ${file.name}"
}
}
Delimiter for Hierarchical Listing
Use delimiters to treat storage as hierarchical file system:
// List "folders" using delimiter
def options = [
prefix: 'documents/',
delimiter: '/'
]
bucket.listFiles(options).each { item ->
if (item.isDirectory()) {
println "Folder: ${item.name}"
} else {
println "File: ${item.name}"
}
}
// Get immediate children only (non-recursive)
bucket.listFiles(prefix: 'photos/', delimiter: '/').each { file ->
// Only lists files directly in 'photos/'
// Not in 'photos/vacation/' or other subfolders
println file.name
}
Pagination
Handle large file lists with pagination:
// Basic pagination
def marker = null
def allFiles = []
while (true) {
def options = [
maxKeys: 1000,
marker: marker
]
def files = bucket.listFiles(options)
if (!files) break
allFiles.addAll(files)
// Update marker for next page
marker = files.last().name
if (files.size() < 1000) break
}
println "Total files: ${allFiles.size()}"
Advanced Filtering
// Filter by extension
bucket.listFiles().findAll { file ->
file.name.endsWith('.pdf')
}.each { file ->
println "PDF: ${file.name}"
}
// Filter by size
bucket.listFiles().findAll { file ->
file.contentLength > 1024 * 1024 // Files > 1MB
}.each { file ->
println "${file.name}: ${file.contentLength / 1024 / 1024}MB"
}
// Filter by date
def yesterday = new Date() - 1
bucket.listFiles().findAll { file ->
file.lastModified > yesterday
}.each { file ->
println "Recent: ${file.name}"
}
// Complex filtering
bucket.listFiles(prefix: 'uploads/')
.findAll { file ->
file.name.contains('invoice') &&
file.contentType == 'application/pdf' &&
file.contentLength < 10 * 1024 * 1024 // < 10MB
}
.sort { it.lastModified }
.reverse()
.take(10)
.each { file ->
println "Recent invoice: ${file.name}"
}
Metadata Operations
Working with Metadata
// Set metadata on upload
def file = bucket['document.pdf']
file.contentType = 'application/pdf'
file.setMetadata([
'author': 'John Doe',
'department': 'Engineering',
'version': '1.0',
'classification': 'internal'
])
file.bytes = documentBytes
file.save()
// Read metadata
def metadata = file.getMetadata()
metadata.each { key, value ->
println "${key}: ${value}"
}
// Update metadata
def existingMetadata = file.getMetadata()
existingMetadata['version'] = '1.1'
existingMetadata['lastReviewed'] = new Date().toString()
file.setMetadata(existingMetadata)
file.save()
// Access specific metadata attributes
def author = file.getMetaAttribute('author')
file.setMetaAttribute('status', 'published')
file.removeMetaAttribute('draft')
Content Type Management
// Set content type explicitly
file.contentType = 'application/json'
// Infer from file extension
def contentType = getContentTypeFromExtension(file.name)
file.contentType = contentType
// Common content types
def contentTypes = [
'txt': 'text/plain',
'html': 'text/html',
'json': 'application/json',
'xml': 'application/xml',
'pdf': 'application/pdf',
'jpg': 'image/jpeg',
'png': 'image/png',
'mp4': 'video/mp4',
'zip': 'application/zip'
]
// Set content type with encoding
file.contentType = 'text/html; charset=utf-8'
Batch Operations
Bulk Uploads
// Upload multiple files
def localFiles = new File('local-dir').listFiles()
localFiles.each { localFile ->
def cloudFile = bucket[localFile.name]
cloudFile.bytes = localFile.bytes
cloudFile.contentType = getContentType(localFile.name)
cloudFile.save()
println "Uploaded: ${localFile.name}"
}
// Parallel uploads (using GPars)
import groovyx.gpars.GParsPool
GParsPool.withPool(10) {
localFiles.eachParallel { localFile ->
def cloudFile = bucket[localFile.name]
cloudFile.inputStream = localFile.newInputStream()
cloudFile.save()
}
}
Bulk Downloads
// Download all files with prefix
def files = bucket.listFiles(prefix: 'exports/')
files.each { file ->
def localFile = new File("downloads/${file.name}")
localFile.parentFile.mkdirs()
localFile.withOutputStream { out ->
out << file.inputStream
}
println "Downloaded: ${file.name}"
}
Bulk Deletions
// Delete files matching criteria
bucket.listFiles(prefix: 'temp/')
.findAll { it.lastModified < (new Date() - 7) }
.each { file ->
file.delete()
println "Deleted: ${file.name}"
}
// Delete all files in prefix
bucket.listFiles(prefix: 'old-data/').each { file ->
file.delete()
}
Groovy Operators
Assignment Operator
// Direct assignment using = operator
bucket['file.txt'] = 'Content as string'
// Equivalent to:
def file = bucket['file.txt']
file.text = 'Content as string'
file.save()
Left Shift Operator
// Append using << operator (for streams)
def output = file.outputStream
output << 'First line\n'
output << 'Second line\n'
file.save()
// Copy using <<
destFile.inputStream << sourceFile.inputStream
Elvis and Safe Navigation
// Safe navigation
def size = file?.contentLength ?: 0
// Elvis operator for defaults
def type = file.contentType ?: 'application/octet-stream'
// Check existence safely
if (bucket['file.txt']?.exists()) {
println "File exists"
}
Best Practices
Memory Management
// ✓ Good: Use streams for large files
file.inputStream.withStream { stream ->
// Process stream
}
// ✗ Bad: Load entire file into memory
def allBytes = file.bytes // Loads everything into RAM
Error Handling
try {
def file = bucket['important.dat']
if (!file.exists()) {
throw new FileNotFoundException("File not found: ${file.name}")
}
file.inputStream.withStream { stream ->
processStream(stream)
}
} catch (Exception e) {
log.error("Error processing file", e)
// Handle error appropriately
}
Resource Cleanup
// ✓ Good: withStream handles cleanup
file.inputStream.withStream { stream ->
// Stream automatically closed
}
// ✗ Bad: Manual management (error-prone)
def stream = file.inputStream
try {
// Process stream
} finally {
stream?.close()
}
Efficient File Copying
// ✓ Good: Direct stream copy
destFile.inputStream = sourceFile.inputStream
destFile.contentLength = sourceFile.contentLength
destFile.save()
// ✗ Bad: Load into memory first
destFile.bytes = sourceFile.bytes // Loads entire file into RAM
destFile.save()